Агрегатор Telegram барахолок с нуля. Технический разбор бэкенда и проблем

от автора

Привет, Хабр!

Начиная думать о новом пет-проекте, мне хотелось решить какую-нибудь проблему, которая болит не только мне. Взгляд пал на барахолки в телеграм чатах Грузии, а именно потребность в поиске нужных вещей там без постоянного мониторинга десятка групп. Казалось бы, не так и сложно, но на деле я столкнулся с весьма интересными подводными камнями, о решении которых и хотелось бы тут рассказать.

Статья — технический разбор моей попытки построить сеть каналов-барахолок в Telegram, которая включала бы в себя все объявления уже существующих и устранила фактор хаоса (спам, коммерция, мусорные сообщения). А так же поверх каналов — бота с подпиской на объявления по ключевыем словам.

Проблема

На каждый город есть N чатов-барахолок, куда люди постят сообщения о продаже. Кто стол продаёт, кто коптер, а кто-то пытается и запрещёнку. С последними худо-бедно справляются админы этих барахолок, однако всё это выглядит очень хаотично, с кучей спама и рекламы, какими-то внутренними правилами. Поверх всего этого — боты, которые просят новых людей пройти капчи и тд. Вся эта вакханалия мелькает/исчезает — раздражает одним словом. Какие-то пользователи при продаже закидывают объявление в один чат, кто-то во все сразу, кто-то по 10 раз в неделю одно и то же продаёт — следить за этим всем утомительно.

Вот бы отмыть этих детей и автоматизировать!

Задача

  1. Создать по одному каналу-барахолке на город, куда будут попадать уникальные объявления и по пути вырезать ботов, спамеров, коммерцию и простые оффтоп-сообщения.

  2. Создать бота, в котором можно подписаться на N городов и получать только релевантные объявления по ключевым словам.

Как видится решение с глаз пользователя

Пользователь джойнится в один канал и мониторит так же как он делал это в N группах до этого.

Пользователь заходит в бота, выбирает город и создаёт фильтр с ключевыми словами вида «компьютерный стол, велосипед*, iphone«, (звёздочка означала бы частичное совпадение) и с этого момента получает релевантные объявления по ключевым словам.

Как видится техническое решение

  • Каким-то образом забираем новые сообщения со всех нужны групп (веб-скраппинг? боты? telegram client api?)

  • Продумываем систему фильтрации и спама

  • Валидируем, проверяем на дубликаты и постим в созданный нами тг канал

  • Создаём бота, в котором спрашиваем пользователя что он хотел бы найти, держим сессию разговора, продумываем UX/UI, реагируем на блокировки, обрабатываем эджкейсы и все другие аттрибуты работы с пользователем через интерфейс тг.

  • Как-то пытаемся качественно сматчить ключевые слова и само объявление (банальный Equal/Contains? Эластик? Морфологический разбор? AI?)

  • Стараемся выбирать экономически-целесообразные подсистемы/технологии, пет-проект всё же. Т.к. в обиходе уже были Postgres, Redis и скрипты для деплоя на DigitalOcean с предыдущих пет-проектов — выбор был очевиден. +DynamoDB для сессий пользователя, а SQS как недорогая и надёжная FIFO очередь с фишками дедупликации и группировки по GroupId.

Технический стек

Техническую сторону в целом собрал из того, на чём уже наработана рука.

Язык: .net 8 (C#) / Node.JS (Typescript)
Очередь: AWS SQS (будем использовать как транспорт + балансер нагрузки)
Кеш: Redis (будем там хранить историю объявлений, картинки и юзать pub/sub для системных ивентов)
БД: PostgreSQL (пользователи бота, конфигурации), DynamoDB (сессии юзеров в боте)
Логи/Метрики/Графики: NewRelic
Ну и до кучи : Docker+Dockerhub, Telegram Bot API, Telegram Client API, чуточку ИИ (openAI)

Верхнеуровневая архитектура

По итогу получилось 6 микросервисов. Мне было интересно скомпоновать их таким образом, чтобы в теоретическом будущем можно было их легко скейлить горизонтально в нужном количестве в зависимости от задачи. Какие-то типы задач запускались на тех же микросервисах в отдельных потоках, но написаны были таким образом чтобы быть легко вынесены в отдельный независимый сервис. Модульность — наше всё.

Какие микросервисы получились в итоге :

  1. Микросервис для скраппинга сырых сообщений с чатов-барахолок и отправки в очередь

  2. Микросервис для порционной обработки сырых сообщений, фильтрации

  3. Микросервис для отправки в Telegram готовых сообщений

  4. Микросервис для получения и обработки сообщений из Telegram бота и отправки в очередь

  5. Микросервис для матча новых сообщений и пользовательских фильтров

  6. Микросервис для отложенных заданий (обработка ключевых слов через openai и подбор истории уже существующих объяв . Это асинхронные операции)

    Выгляди как-то так :

    Диаграмма сервисов и взаимодействия

    Диаграмма сервисов и взаимодействия

Каждый микросервис обёрнут в докер, облеплен логгерами и обработкой ошибок. Работает независимо и выполняет одну-две задачи, деплой автоматизирован на уровне bash команд, все сервисы хостятся на одной ноде в DigitalOcean в разных контейнерах (в целях экономии средств).

Деплой выглядит как локальный билд и сборка образа -> push в dockerhub и конфигов в S3 -> connect к инстансу по ssh -> команда для остановки текущего контейнера и деплоя нового через gracefull shutdown. Downtime около 5 секунд для скраппера, все остальные без простоя за счёт очередей.

Notes :

  1. Многое можно было легко завернуть в лямбды, выбор в пользу такого способа хостинга и развёртки — личное предпочтение.

  2. В DynamoDB хранятся только сессии пользователей от чат-бота, можно легко было переместить в Redis.

  3. В PostgreSQL храню базовую информацию о пользователе бота, созданные им фильтры для подписки и ключевые слова для поиска. Так же там лежит конфигурация всех связей каналов и чатов, всё это вынимается один раз и локально кешируется, т.к. очень редко меняется.

  4. В Redis хранятся хэши картинок и текстов за последние с TTL 14 дней — чуть ниже зачем. Ну и по мелочи.

  5. CI/CD осмысленно не прикручивал — вся команда состоит из одного человека поэтому скип.

Telegram Client API

Первой задачей был парсинг N групп PER CITY. В моём случае это около 5-7 групп на каждый город. Обычно формат объявления выглядит как 0-5 картинок и текстовое описание. Обычного бота для отслеживания туда не добавишь, пришлось осваивать Клиент. Я взял самый популярный под .net и начал изучать API.

В целом, всё оказалось не так и сложно, в репозитории есть примеры коннекта/получения сообщений. Процесс таков :

Регистрируете свой app, получаете appId и apiHash. У меня как раз была вторая симка на которую можно было создать такой апп. Учтите, что Telegram очень чувствителен к качеству «истории» номера при использовании в качестве Клиента и может легко вас забанить если ему покажется, что вы абьюзите лимиты. Там нет чётких критериев — все лишь гадают на форумах.

Начинаем получать по MTProto протоколу данные, либа всё распаковывает и вы ловите сообщения с нужных каналов.

Подводные камни

  1. Лимиты Telegram Client API

    Сперва мне показалось, что неплохой идеей будет создать промежуточный чат, в который добавить обычного бота и через Client просто туда делать Forward нужных сообщений. Воткнулся в лимиты. Стоит сказать, что api телеграма со всех сторон закрыты этими лимитами. Практически каждый метод, который вы вызываете (дёргаете сервер).

    Если сказать по-простому — GET запросы имеют самые широкие лимиты, все модифицирующие/меняющие/делающие что-то — довольно жёстко лимитированы. Сначала вы будете получать специфически FLOOD_WAIT_X, где X — кол-во секунд, которое нужно подождать до след запроса. Если будете игнорировать — вас забанят. Поэтому старайте по-максимуму следить за ошибками — может стоить вам аккаунта (в тематических чатах и форумах частая история).

    Вот тут, тут и тут можно отыскать больше информации. В офф документации к сожалению не всё прописано.

    Продолжать тему о промежуточном чате я думаю не стоит — из-за этих самых лимитов на форвард сообщений. Да и идея изначально — костыль. Не прокатило, вычёркиваем.

  2. Внутреннее устройство объектов в MTProto

    Пришлось разбираться с внутренней структурой объектов. Хоть в либе всё и обёрнуто — там есть некоторое количество абстракций, которые как я понял — «фишка» протокола, которые немного … странные на первые взгляд. От разных методов вам может приходить допустим один абстрактный объект UserBase и будут доступны только базовые проперти пользователя, а чтобы получить расширенные — вам нужен секьюрити хэш, которые берутся из другого места. Во внутренностях там можно ногу сломать, но так или иначе вам придётся разобраться с этим, если хотите вытащить, допустим никнейм пользователя. Тут примеры приводить не буду, пишите в лс если совсем в тупик заедете 😉

  3. Пришлось делать временной буфер между приходом сообщений и отправкой их на дальнейшую обработку, т.к. :

    1. С точки зрения протокола 5 картинок и текст — это 5 разных сообщений (текст прикрепляется к одному из фото), связанных между собой по внутреннему groupId. Приходит всё асинхронно. Вы сами должны отслеживать и склеивать их.

    2. Пользователь может запостить и удалить сообщение. Бот может удалить. Админ может удалить. Всё что удаляется — не должно попадать к нам, поэтому мы в том числе должны следить и за удаляемыми объектами.

      И вот тут нас коварный сюрприз.

      Я голову сломал пока пытался понять почему в исходной группе сообщение удалено, а у меня оно не помечено как удалённое. Оказалось, не туда смотрел. Где-то на просторах документации к Telethon (Python обёртка над MTProto) я вычитал, что Telegram не гарантирует пуш этого event’а не-официальным клиентам (ну то есть вам). Вот такая вот свобода и равенство среди офф и алтернативных клиентов 🙂 Решения я пока не нашёл, однако процент «неприходов» около 5%, не критично, но неприятно.

    3. Определенный процент пользователей сначала кидает фотографии, а уже потом пишет описание. Или наоборот. И это не редкий сценарий, пришлось обрабатывать.

  4. Иногда у вас отлетает сессия. Иногда серверы недоступны. И еще много интересных ошибок, с которыми вы ничего сделать не можете, кроме как наблюдать и внедрить Retry-Policy везде, где это потребуется.

  5. Мне конечно же нужны были картинки для «перепоста». Проблема в том, что вы должны забирать её тут же, т.к. вместе с ивентом к вам приходит и определенного вида access_hash и вы не можете по этому file_id забрать другим обычным ботом например и переиспользовать. Сесурити. Не проблема. Выкачиваем и сторим какое-то время в памяти.

Результат : собираем сообщения с разных чатов, склеиваем, выжидаем 5 минут и посылаем в следующий сервис. Скачиваем картинки к себе, кладём в base64 (+30% к весу) и пулим в редис с небольшим TTL.

Вскрылся небольшой нюанс — часто люди/админы/боты удаляют объяление и спустя 20 и 50 минут а то и неделю. Пришлось обрабатывать. Ловишь -> отправляешь так же в микросервис, но с пометкой на удаление. И там уже сервис найдёт его в «опубликованных» и удалит. Для этого сохраняем связь {child_chat_id_msg_id — posted_to_channel_id_msg_id} и потом по ключу в кеше ищем за O(1).

Недавно на подкасте Радио-Т услышал шутку :

90% запланированного времени на разработку занимает разработка, а уже оставшиеся 90% — на разработку.

Подтверждаю. Всё так — на все «шлифовки» уходит прорва времени. Но мне кажется так и должно быть, т.к. хороший продукт в итоге и отличается в этих 10%, которые важны ровно настолько же, насколько и первые 90.

Telegram Bot API

Логичным будет сказать пару слов о боте, который написан на ванильном (обычном) Telegram Bot Api. Тут всё довольно понятнее, т.к. именно с такими ботами вы взаимодействуете в чатах и API более дружелюбно к разработчику. Client — всё же изначально создан для альтернативных клиентов, порог входа чуть выше.

Функционал нашего же бота — поприветствовать пользователя, выкатить список возможных действий (создать фильтр, написать в саппорт) ну и возможность в ответ асинхронно прислать релевантное объявление. Плюс всё это как-то красиво обрамить.

Тут главным вопросом был как найти какой-нибудь фреймворк, который за меня будет сохранять state разговора, чтобы я мог понимать на каком этапе пользователь, т.к. за этим нужно следить именно нам. Я в прошлых проектах писал это руками, не хотелось велосипедить.

Сперва рассматривал решение от Microsoft Bot Framework, но понял, чо он сильно завязан на ажуровский портал — не подходит. Наткнулся на Telegram Bot Framework, который через абстракции в виде Forms позволяет довольно удобно организовать базовый диалог с пользователем + имеет в наборе базовые контролы (список, пагинация, грид, автосокрытие формы и тд). Фреймворк пришлось довольно сильно допиливать под свои паттерны обработки вебхуков и стейт-машину. Но в целом рекомендую — он неплох.

Создаём фильтр с ключевыми словами и кладём в базу. Всё построено на тексте и кнопках. Лезть в mini-apps, которые представлены буквально недавно — не решился, но обязательно попробую в следующих итерациях/пет-проектах, т.к. UX общения через кнопки выглядит уже довольно ретроградно, хоть и лаконично.

Добавлю лишь, что пришлось хорошенько подумать над UX как таковым, т.к. изначально я сделал доволньо «техническо-гиковский» интерфейс, но близкие мне люди, которые были первыми пользователями бота намекнули мне, что не полетит. Пришлось всё упростить максимально. Они были правы — чем проще, тем лучше.

Посмотреть на бота можно тут.

Конечный вид объявления в канале

Конечный вид объявления в канале
А вот так выглядит бот

А вот так выглядит бот

Фильтрация спама

Отдельной задачей стояло отфильтровать уже собранный объявления. На вооружение я взял несколько концепций :

  1. Black list — набор слов и/или регулярок, хранится в базе и гибко может применяться по разным критериям (per channel / per city / per child chat) + бан по userId

  2. UserID пользователя. Я заметил, что 90% спама приходит от пользователей с id > 6 000 000 000, т.е. новых пользователей, что в целом логично. Спамеры регистрируют новые аккаунты, им выдаётся ID — спамят — банят — новые акки. Так и накручивается. Но тут есть конечно же весьма реальный шанс пробанить новых пользователей, поэтому использую лишь как фактор. Второй фактор — кол-во фотографий. Учитывая, что у меня есть делэй в 5 минут — спам чаще всего отсекают боты в дочерних группах, до меня доходит не так много, но всё еще существенно, особенно по ночам, когда админы спят.

Фильтрация дубликатов

Вычисляем хэши всех картинок (от base64) и кладём по {channelId_userId_hash} в Redis. Если человек с девайса грузит в Telegram одни и те же картинки, хэш будет одинаковый. То же самое с текстом — обычно люди не запариваются с изменением, просто кидают одно и то же каждый день. Ставим TTL в 14 дней и вот у нас в целом «дёшево и сердито» работает фильтрация дубликатов. Не идеально, но на данном этапе — пойдёт.

У данного подхода есть минус — если человек меняет цену. Но тут нужен другой подход изначально к обработке текста объявления.

Обработка сообщений через AWS SQS

Отдельно хочется затронуть тему AWS очередей. У них есть такая полезная фича как группировка (groupId), которая отлично ложится на несколько сценариев, которые у нас есть :

  1. Отсылка в определенный канал

  2. Отсылка и получение сообщений от юзеров сообщений

Базово, все консьюмеры очередей устроены следующим образом :

  1. Забираем из очереди N айтемов в локальную очередь и следим чтобы там не было >M (size). M вычисляем опытным путём от загрузки пода.

  2. В фоне крутится ThreadPool, который состоит из NP потоков и разгребает локальную очередь. За раз берем в каждый поток в обработку все имеющиеся сообщения с нужным GroupdID — этим мы гарантируем, что SQS не отдаст другим консьюмерам более новые сообщения пока не будут обработаны по очереди все текущие. Такая логика — в NP потоках.

Тем самым мы решаем несколько задач — параллелим обработку per instance + диверсифицируем обработку и лочим её на groupID, что позволяет нам избежать троттлинга, когда один из producerGroupId забивает собой очередь — остальные «клиенты» не пострадают, троттлится будет только тот самый «нагруженный» продьюсер/юзер. Поэтому важно заранее продумать каким образом группировать сообщения. Где-то такая логика может не подойти и стрельнуть в ногу.

Особенно хорошо это работает в контексте отсылки сообщений в канал/пользователям, т.к. если вы упираетесь в лимиты отправки — можно просто данное сообщение возвращать в очередь с обновлённым «time to wait» параметром (15min max) и все остальные айтемы этого groupId будут залочены на этот же срок, т.к. FIFO нам гарантирует порядок доставки.

Матчинг пользовательских фильтров через AI

Есть поток поступающих сообщений. Есть пользовательские фильтры вида «компьютерный стол, велосипед*, iphone«. И есть текст объявления. Задача — понять, есть ли в тексте что-то из фильтра. На это работает отдельный микросервис.

Первая мысль была прикрутить каким-то образом сюда ElasticSearch, но прикинув фронт работ, а так же отталкиваясь от экономики проекта — оставил на будущее.

Банальным equal тут не пройдёшься (так я сделал в первой версии) — будет много false-positive срабатываний, а т.к. это ключевая особенность проекта — работать должно как минимум хорошо, отбрасываем эту идею (а точнее переписываем).

Дальше мысль пошла в стемминг и морфологию. К сожалению я не нашёл каких-то открытых бесплатных вариантов, которые удовлетворили бы меня по качеству. Есть неплохой проект Natasha, но выбор пал на менее требовательный к ресурсам и времени разработки вариант — модельку от open ai (4o-mini).

В момент создания фильтров в фоне запускается таска, которая бежит к Сэму Альтману и просит её просклонять все слова. Результат пишем в базу рядом с фильтром. По итогу получаем все вариации по которым можно искать в тексте объявления.

Работает весьма данный подход весьма стабильно. Пришлось лишь чуть подтюнить — если слово на английском, короче 4 символов или со звёздочкой на конце — не включаем их в этот процесс и обрабатываем «как есть».

Так же при поиске в тексте закладываем возможность того, что если пользователь ввёл «красный велосипед», то матч по фразе «продам копию красного редкого велосипеда» будет найден. Расстояние между ключевыми словами в N символов заложено в алгоритме поиска.

Вышло в итоге дёшево и с хорошим результатом на выходе — люди находят то что ищут. Есть куда улучшать качество, но для пет-проекта good enough как говорится.

Поиск по истории объявлений

Хоть в Telegram и имеется нативная функция поиска — работает она на удивление не так уж и хорошо, как могла бы. Поэтому я подумал — а чего бы при создании фильтра не подкидывать пользователю релевантные объявления из прошлого.

При создании фильтра мы проходимся по всем объявлениям канала на который подписался пользователь и делаем поиск по схеме выше. Если что-то находим, то асинхронно предлагаем пользователю показать свежайшие 10 и молотим ссылки отдельными сообщениями. Отдельными — чтобы Telegram показывал preview, так пользователю проще на это смотреть.

Историю сообщений храним в Redis (sorted list) две недели. Забираем всё разом. Тут есть интересный нюанс, т.к. нам не нужно хранить эти записи вечно, а TTL можно установить только на ключ (что нам не подходит, т.к. один канал = один ключ), у Redis есть механизм Score для SortedList’а. При добавлении нового объявления в кэш, мы ставим timestamp как score и раз в N времени скедулером в фоне просто чистим в один запрос айтемы, которые старше такого-то времени. Работает как часы.

Деплой

Вместо CI/CD было решено просто запускать деплой локально парой команд. У меня есть заготовка команд для NPM, где я двумя командами деплою новую версию на нужный мне под. Скейлинг не автоматизирован, лишь заложен в архитектуру.

Вот пример такого файла :

  "name": "test",   "private": true,   "version": "1.0.0",   "description": "",   "scripts": {     "build-dev": "(dotnet publish -c Dev)",     "build-prod": "(dotnet publish -c Release)",     "docker-build-dev": "npm run build-dev && docker build -t image-dev-main-bot-update-handler -f Dockerfile.Dev .",     "docker-build-prod": "npm run build-dev && docker build -t image-dev-main-bot-update-handler -f Dockerfile .",     "docker-build-and-run": "npm run docker-build-dev && docker run -dti -p 3002:3002 --name container-dev-main-bot-update-handler image-dev-main-bot-update-handler",     "docker-stop-and-clear": "(docker stop -t 35 container-dev-main-bot-update-handler || exit 0) && (docker rm container-dev-main-bot-update-handler || exit 0)",     "docker-start": "npm run docker-stop-and-clear && npm run docker-build-and-run && npm run docker-attach",     "docker-stop": "npm run docker-stop-and-clear",     "docker-attach": "docker attach --sig-proxy=false --detach-keys=\"ctrl-c\" container-dev-main-bot-update-handler",     "docker-attach-sh": "docker exec -it container-dev-main-bot-update-handler bash",     "docker-push": "node devscripts/package-docker-push.js",     "docker-logs": "docker logs container-dev-main-bot-update-handler",     "s3-push": "aws s3 cp droplet_npm_package s3://{YOUR_S3_BUCKET} --acl public-read --recursive --profile VS_HOME",     "get-package": "wget -N https://{YOUR_S3_BUCKET}/main-bot-update-handler/package.json",     "docker-publish": "npm run build-prod && npm run docker-build-prod && npm run docker-push && npm run s3-push && npm run connect-h1",     "docker-publish-without-connect": "npm run build-prod && npm run docker-build-prod && npm run docker-push && npm run s3-push",     "connect-h1": "ssh -i {YOUR_SSH_KEY} root@{INSTANCE_IP}}"   } }

При вызове npm run docker-publish мы билдим, заливаем на dockerhub версию, а на S3 обновленный конфиг для instance, коннектимся к поду и там одной командой завершаем деплой (забираем обновленный конфиг и имадж). Просто и достаточно автоматизированно. +отдельный рутовый похожий файл чтобы задеплоить сразу все сервисы в рамках Instance.

Дополнительной галочкой для себя я реализовал во всех сервисах Graceful Shutdown подход, т.е. перед убиванием — контейнер ждёт пока микросервис разрешит его убить. В это время в сервисе закрывается приём новых ивентов на обработку, прослушка вебхуков, все потоки отчитываются о завершении (с таймаутом в 30с) через CancellationToken. Как итог мы не теряем сообщения, не вносим разлад в state’ы и мягко завершаем работу микросервиса. Данное решение потребовало определенной дисциплины в слежении за потоками и

Логи

Логи собираются Logstash’ом и отправляются по двум каналам в зависимости от уровней — в спец бота Telegram и NewRelic. Последних выбрал из-за их вкусного бесплатного тарифа, которого мне пока полностью хватает. Там же живут и метрики с дашбордами. Хотелось прикрутить Grafana + Prometheus или какой-нибудь ELK стек, но в этот раз немного упростил себе жизнь 🙂

Выводы

Получился немного заоверинженеренный пет-проект с интересными техническими задачками. Удалось пройти весь цикл продуктовой разработки от дизайна архитектуры, UX, DevOps и до маркетинга с пиаром. Классный опыт.

Пилите пет-проекты — это не только весело, но и добавляет скиллы «широкого» профиля.

Спасибо тем безумным, кто зачем-то всё это прочитал. Если вам было хоть чуть интересно — загляните в мой айтишно-бэкендовый телеграм канал, там уютно.


https://t.me/ITrower — мой авторский Telegram канал «Айтигребец», в котором я делюсь всякой технической годнотой. Был бы рад вас там увидеть 🙂

ps. А еще я сейчас открыт для сотрудничества, поэтому если у вас есть на примете позиция для Backend Senior Node.js разработчика — напишите мне в LinkedIn или на почту Nigrimmist@gmail.com. Да и просто добавляйтесь, будем на связи.

Всем добра!


ссылка на оригинал статьи https://habr.com/ru/articles/900700/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *