Свой агрегатор новостей на python. Телеграм + RSS + новостные сайты (telethon, feedparser, scrapy)

от автора


freepik

Здравствуйте дорогие хабровчане, в этом посте я хочу показать, как написать свой агрегатор новостей. Конечно, сразу становится очевидно, что это очередное изобретение велосипеда, однако анализируя существующие решения я всё время натыкался на камни преткновения. То они слишком медленно обновлялись, то не было нужных мне источников или часто бывало, что вообще ничего не работало без возможности починить. В итоге я написал своё решение.

Автор статьи приторговывает на бирже, и главной мотивацией было собрать все новости по интересующей теме в одном месте, чтобы не мониторить десяток различных источников вручную.

Текст под катом по большей части технический и будет, скорее всего, интересен читателям, которые сами торгуют на бирже и при этом в IT теме, либо тем, кто сам давно хотел написать агрегатор чего-нибудь.

Об агрегаторе новостей я размышлял уже давно. Во время торговли на бирже мне постоянно приходилось мониторить десяток авторитетных источников, особенно это напрягало, когда должна была выйти какая-нибудь новость, которая точно будет влиять на курс цены акций. В такие моменты было особенно сложно и обидно, когда подобную новость я пропускал. В общем, мне нужен был инструмент, с которым я мог бы оставаться в курсе всего.

Чтобы упростить понимание я написал два агрегатора, один — простой, его рассмотрю здесь. Код второго агрегатора, которым я пользуюсь сам, будет приложен в конце статьи. Простой агрегатор, в сущности, является более упрощённой версией сложного.

Основными источниками информации были телеграм каналы и новостные сайты. Для парсинга телеграма я выбрал telethon. Новости с сайтов можно забирать через RSS каналы с помощью feedparser. Однако, не на всех сайтах есть RSS, в этом случае буду парсить сайт напрямую используя scrapy. Полученные новости сливаются в отдельный телеграм канал с указанием источника.

Каждый парсер написан таким образом, чтобы его можно было запустить отдельно от остальных. Это значительно упрощает процесс добавления новых источников, их лучше проверять отдельно, чтобы убедиться в работоспособности. Например, feedparser может не прочитать RSS канал и тогда его придется парсить вручную.

Репозиторий с исходным кодом (простой парсер) — на GitHub.

1. Парсим телеграм канал

Чтобы telethon работал, необходимо для своего телеграм аккаунта создать переменные api_id и api_hash на сайте my.telegram.org и добавить эти параметры в скрипт. При первом запуске telethon сам создаёт файл с названием сессии (в нашем случае это gazp.session), его удалять не нужно, иначе придётся проходить аутентификацию ещё раз при следующем запуске.

Парсер сам по себе очень простой, считывает посты из канала @prime1 и печатает их в консоль, либо отсылает их, если в параметрах метода telegram_parser определена функция send_message_func.

telegram_parser.py

Показать…

from telethon import TelegramClient, events  def telegram_parser(send_message_func=None, loop=None):     '''Телеграм парсер'''      # Параметры из my.telegram.org     api_id = <Твой api_id>     api_hash = <Твой api_hash>      # Канал источник новостей @prime1     channel_source = 'https://t.me/prime1'      # Сессия клиента telethon     session = 'gazp'      client = TelegramClient(session, api_id, api_hash, loop=loop)     client.start()      @client.on(events.NewMessage(chats=channel_source))     async def handler(event):         '''Забирает посты из телеграмм каналов и посылает их в наш канал'''          if send_message_func is None:             print(event.raw_text, '\n')         else:             await send_message_func(f'@prime1\n{event.raw_text}')      return client  if __name__ == "__main__":      client = telegram_parser()      client.run_until_disconnected()

2. Парсим RSS

Примечание: RSS — крайне удобная вещь, в сущности это xml-файл, который мало весит, и в котором нет ничего лишнего. Такой файл сервер отдаёт без лишней нагрузки и клиент может легко его распарсить, имея при этом минимальные задержки для обновления.

Так как парсер забирает каждый раз N новостей, то каждый раз скачиваются старые новости, которые уже были напечатаны/отправлены. Для решения этой проблемы я ввёл очередь posted_q. Ясно, что вообще все сообщения сохранять не вариант, т.к. это потребует много памяти и, в конечном счёте, приведёт к ошибке MemoryError, когда она закончится. Кроме того в большом массиве долго проверять сообщения на повтор.

Таким образом, устаревшие сообщения нужно удалять, а новые сохранять, что и происходит в очереди. Слева в неё входят свежие новости, а справа удаляются старые, т.е. хранится всего N сообщений в моменте. В качестве ключа в очередь сохраняются первые 50 символов от текста новости, что также сделано для ускорения работы скрипта.

Парсер скачивает новости с RSS канала сайта www.rbc.ru.

rss_parser.py

Показать…

import httpx import asyncio from collections import deque import feedparser  async def rss_parser(httpx_client, posted_q,                      n_test_chars, send_message_func=None):     '''Парсер rss ленты'''      rss_link = 'https://rssexport.rbc.ru/rbcnews/news/20/full.rss'      while True:         try:             response = await httpx_client.get(rss_link)         except:             await asyncio.sleep(10)             continue          feed = feedparser.parse(response.text)          for entry in feed.entries[::-1]:             summary = entry['summary']             title = entry['title']              news_text = f'{title}\n{summary}'              head = news_text[:n_test_chars].strip()              if head in posted_q:                 continue              if send_message_func is None:                 print(news_text, '\n')             else:                 await send_message_func(f'rbc.ru\n{news_text}')              posted_q.appendleft(head)          await asyncio.sleep(5)  if __name__ == "__main__":      # Очередь из уже опубликованных постов, чтобы их не дублировать     posted_q = deque(maxlen=20)      # 50 первых символов от текста новости - это ключ для проверки повторений     n_test_chars = 50      httpx_client = httpx.AsyncClient()      asyncio.run(rss_parser(httpx_client, posted_q, n_test_chars))

3. Парсим сайт напрямую

Кастомный парсер работает так же, как и RSS парсер, за тем лишь исключением, что используется scrapy вместо feedparser и скачивается вся страница, в которой кроме новостей ещё есть куча всего. Из-за этого приходится выставлять бо́льшую паузу между обращениями, ведь если слишком активно напрягать сервер, он может и забанить на какое-то время.

Подобный вид скриптов приходится писать, если у новостного сайта нет оперативно обновляемого телеграм и/или RSS канала. Парсер скачивает новости напрямую с сайта www.bcs-express.ru.

bcs_parser.py

Показать…

import httpx import asyncio from collections import deque from scrapy.selector import Selector  async def bcs_parser(httpx_client, posted_q,                             n_test_chars, send_message_func=None):     '''Кастомный парсер сайта bcs-express.ru'''      bcs_link = 'https://bcs-express.ru/category'      while True:         try:             response = await httpx_client.get(bcs_link)         except:             await asyncio.sleep(20)             continue          selector = Selector(text=response.text)          for row in selector.xpath('//div[@class="feed__list"]/div/div')[::-1]:              raw_text = row.xpath('*//text()').extract()              title = raw_text[3]             summary = raw_text[5]              news_text = f'{title}\n{summary}'              head = news_text[:n_test_chars].strip()              if head in posted_q:                 continue              if send_message_func is None:                 print(news_text, '\n')             else:                 await send_message_func(f'bcs-express.ru\n{news_text}')              posted_q.appendleft(head)          await asyncio.sleep(10)  if __name__ == "__main__":      # Очередь из уже опубликованных постов, чтобы их не дублировать     posted_q = deque(maxlen=20)      # 50 первых символов от текста новости - это ключ для проверки повторений     n_test_chars = 50      httpx_client = httpx.AsyncClient()      asyncio.run(bcs_parser(httpx_client, posted_q, n_test_chars))

4. Запускаем все парсеры разом

Т.к. каждый парсер реализован асинхронно, то, чтобы они работали все вместе, добавим их в один общий цикл событий (event_loop). Это сделано для экономии ресурсов.

Примечание: в обычном синхронном коде, когда процесс в исполняющем потоке доходит до места, где требуются внешние ресурсы, он блокирует исполнение, ожидая ответа. При асинхронной реализации программы исполняющий поток занимается другим процессом — за счет этого и увеличивается производительность.

Тут же стоит отметить, что очередь posted_q (класс deque() модуля collections в python) является потокобезопасной, т.е. можно спокойно добавлять в неё новости из разных парсеров.

main.py

Показать…

import httpx import asyncio from collections import deque  from telegram_parser import telegram_parser from rss_parser import rss_parser from bcs_parser import bcs_parser  loop = asyncio.new_event_loop() asyncio.set_event_loop(loop)  # Канал куда скидываем новости, например @habr_agg, сюда введи свой канал channel_habr_agg = 'https://t.me/habr_agg'  # Очередь из уже опубликованных постов, чтобы их не дублировать posted_q = deque(maxlen=40)  # 50 первых символов от текста новости - это ключ для проверки повторений n_test_chars = 50  async def send_message_func(message):     '''Отправляет посты в канал'''     await client.send_message(entity=channel_habr_agg, message=message)  # Телеграм парсер client = telegram_parser(send_message_func, loop)  httpx_client = httpx.AsyncClient()  # Добавляет в текущий event_loop rss парсер loop.create_task(rss_parser(httpx_client, posted_q,                  n_test_chars, send_message_func))  # Добавляет в текущий event_loop парсер сайта bcs-express.ru loop.create_task(bcs_parser(httpx_client, posted_q,                                    n_test_chars, send_message_func))  # Запускает все парсеры client.run_until_disconnected()

Заключение

Репозиторий с исходным кодом (сложный парсер) — на GitHub.

Как было сказано выше, сложный агрегатор — это усложнённый вариант агрегатора простого. Основные отличия — фильтр для постов, увеличенное количество источников новостей, логирование, доработанная обработка ошибок, имитация запроса пользователя через браузер, докер контейнер и др.

Сложный агрегатор написан таким образом, чтобы быть максимально живучим, однако в принципе состоит из тех же модулей, что и простой.

Можно конечно запустить готовый агрегатор новостей где-то в облаке, но лично у меня он работает на очень слабеньком тонком клиенте, в котором всего 4 Gb оперативной памяти и двухъядерный процессор 1.2 GHz, этого железа хватает с большим запасом. Для меня это удобно, т.к. не приходится постоянно держать включенным настольный компьютер или ноутбук, плюс тонкий клиент совершенно бесшумный.

В целом его работой я доволен, это действительно очень удобно, когда едешь куда-то или отошёл по делам, можно легко следить за новостями через мобильный телефон.

Спасибо за внимание.


ссылка на оригинал статьи https://habr.com/ru/post/689520/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *