ParallelBeautifulSoup (BS4-hack)

от автора

Предлагаю протестировать скрипт написанный на основе заготовки cloude 3.5 Sonnet с использованием специального промта. Пришлось почти полностью переписать, из-за товарищей в комментариях. Критика это хорошо когда обоснована.

Документацию доделаю потом. Внизу есть пример скрипта.

Системный промт через который нейронка дала заготовку для «Хака».

Скрытый текст

Ты — высококвалифицированный Python-программист и инженер данных в области машинного обучения (ML). Твоя задача — помогать пользователям в написании и отладки кода, анализе данных, оптимизации кода. Ты управляешь агентами экспертной системы, которые помогают тебе в решении различных задач.

Алгоритм логики ответа (Рабочий процесс — Цепочка размышлений)

  1. Уточнение: Задавай вопросы, уточняющие детали контекста. Попроси четко сформулировать задачу по технике SMART (Specific, Measurable, Achievable, Relevant, Time-bound).

  2. Определи тематическую область, выбери агента экспертной системы для решения задачи.

  3. Предварительная обработка запроса: Выдели ключевые элементы. Раздели один большой запрос на несколько подзапросов, задавай сам себе гипотетические вопросы о подзапросах и отвечай на них.

  4. Обработка данных: Проверяй свои гипотезы, аргументируй свою точку зрения, используй комбинацию аналитического и дедуктивного мышления. Определи уровни достоверности для каждого шага рассуждений. Отвечай какие препятствия и трудности могут возникнуть.

  5. Подготовка промежуточного ответа: Критически оцени свой ответ. На основе критики исправь ошибки, дополни код, обеспечь полноту и согласованность ответа. Только после этого переходи к выводу окончательного результата!

  6. Вывод окончательного результата: Необходимо предоставить детализированный, структурированный ответ с примерами, шаг за шагом. Если вы не знаешь ответа на задачу или вопрос, просто скажи что не знаешь, а не выдумывай.

  7. Обратная связь: Запрашивай обратную связь у пользователя после выполнения задачи и предлагай улучшения на основе его комментариев.

Приоритет на высокую скорость обработки данных скрипта и оптимизацию потребления памяти для обработки больших объёмов данных. Используй только те библиотеки которые позволяют добиться высокой скорости обработки данных скрипта при широкой функциональности не приводящие к неоправданно высокому потреблению памяти.

Техники оптимизации кода:

  • Замена обычных циклов for in где это возможно на оптимизированные генераторы List/generator Comprehension. Пример: collections.deque([item for item in strings]).

  • Для хранения неизменных списков используй numpy.array или tuple.

  • Используй map, filter, functools.reduce для оптимизации.

  • Используй модуль typing для типизации.

  • Используй декоратор functools.lru_cache для эффективной работы с памятью.

  • Используй комбинацию корутин asyncio, процессов-демонов Process(daemon=True) и очередей Queue из модуля multiprocessing для параллельного выполнения задач с защитой от глобальной блокировки интерпретатора.

  • Используй профилирование cProfile для выявления узких мест.

Ограничения:

  • Не используй классы. Применяй Inline-функции def.

  • Обеспечь совместимость кода python 3.10-3.12 включительно.

  • Не используй логирование.

Scrapy конечно хорошая библиотека, но у него много лишнего функционала, нету модульности и иногда очень странно парсит данные с сайта вырывая кусками.

Установка зависимостей:

pip install beautifulsoup4 lxml numpy psutil

Файл fast_soup.py:

from concurrent.futures import ProcessPoolExecutor from multiprocessing import cpu_count import asyncio from functools import lru_cache import psutil from typing import AsyncIterator, Optional, Dict, Any from collections import deque import numpy as np from bs4 import BeautifulSoup, Tag import platform import warnings import _pickle as cPickle   def _get_optimal_workers() -> int:     """Возвращает оптимальное количество процессов"""     return min(cpu_count(), 61) if platform.system() == 'Windows' else cpu_count()   def _get_optimal_cache_size() -> int:     """Определяет оптимальный размер кэша based on system memory"""     available_memory = psutil.virtual_memory().available     workers = _get_optimal_workers()     base_cache_size = workers * 32      if available_memory < 2 * 1024 * 1024 * 1024:  # < 2GB         return min(base_cache_size, 64)     elif available_memory < 4 * 1024 * 1024 * 1024:  # < 4GB         return min(base_cache_size, 128)     return min(base_cache_size, 256)   @lru_cache(maxsize=_get_optimal_cache_size()) def _cached_parse(html: str, parser: str) -> BeautifulSoup:     """Кэширует создание объекта BeautifulSoup"""     return BeautifulSoup(html, parser)   def _should_clear_cache() -> bool:     """Проверяет необходимость очистки кэша"""     cache_info = _cached_parse.cache_info()     return (cache_info.currsize / cache_info.maxsize) > 0.8   def _parse_nested_tags(tag: Tag, nested_attrs: Dict[str, Any]) -> np.ndarray:     """Рекурсивно ищет вложенные теги с заданными атрибутами"""     results = deque(tag.find_all(nested_tag, attrs if attrs else {})                for nested_tag, attrs in nested_attrs.items())     return np.array(results, dtype=object)   def _parse_chunk(         html: str,         tag: Optional[str] = None,         attrs: Optional[dict] = None,         nested_attrs: Optional[Dict[str, Dict]] = None,         parser: str = 'lxml' ) -> np.ndarray:     """Парсит HTML в отдельном процессе с поддержкой вложенных тегов"""     try:         soup = _cached_parse(html, parser)         results = deque()          if tag:             initial_tags = soup.find_all(tag, attrs if attrs else {})             results.extend(initial_tags)              if nested_attrs and initial_tags:                 nested_results = [_parse_nested_tags(initial_tag, nested_attrs)                                   for initial_tag in initial_tags]                 results.extend([item for sublist in nested_results for item in sublist])         else:             results.append(soup)          if _should_clear_cache():             _cached_parse.cache_clear()          return cPickle.dumps(np.array([r for r in results if r is not None], dtype=object))      except Exception as e:         warnings.warn(f"Ошибка парсинга: {e}")         return cPickle.dumps(np.array([]))   async def parallel_parse(         html: str,         tag: Optional[str] = None,         attrs: Optional[dict] = None,         nested_attrs: Optional[Dict[str, Dict]] = None,         max_workers: Optional[int] = None,         timeout: float = 30.0,         parser: str = 'lxml' ) -> AsyncIterator[Tag]:     """     Асинхронный парсер с поддержкой вложенных тегов     """     try:         max_workers = max_workers or _get_optimal_workers()         loop = asyncio.get_running_loop()          with ProcessPoolExecutor(max_workers=max_workers) as executor:             task = loop.run_in_executor(                 executor,                 _parse_chunk,                 html,                 tag,                 attrs,                 nested_attrs,                 parser             )              try:                 serialized_result = await asyncio.wait_for(task, timeout)                 result = cPickle.loads(serialized_result)                  for item in result:                     if isinstance(item, Tag):                         yield item              except asyncio.TimeoutError:                 warnings.warn("Превышен таймаут задачи")                 _cached_parse.cache_clear()             except Exception as e:                 warnings.warn(f"Ошибка выполнения задачи: {e}")      except Exception as e:         warnings.warn(f"Общая ошибка парсинга: {e}")     finally:         _cached_parse.cache_clear()

Документация по асинхронному HTML парсеру

Содержание

  1. Общее описание

  2. Основные функции

  3. Примеры использования

  4. Оптимизация производительности

  5. Обработка ошибок

1. Общее описание

Данный модуль представляет собой высокопроизводительный асинхронный HTML парсер с поддержкой многопроцессорной обработки. Основные преимущества:

  • Асинхронное выполнение

  • Многопроцессорная обработка

  • Кэширование результатов

  • Оптимизированное использование памяти

  • Поддержка вложенных тегов

2. Основные функции

parallel_parse

Основная функция для асинхронного парсинга HTML:

async def parallel_parse(     html: str,                    # HTML строка для парсинга     tag: Optional[str] = None,    # Тег для поиска     attrs: Optional[dict] = None, # Атрибуты тега     nested_attrs: Optional[Dict[str, Dict]] = None, # Вложенные теги     max_workers: Optional[int] = None,  # Количество процессов     timeout: float = 30.0,        # Таймаут выполнения     parser: str = 'lxml'          # Тип парсера ) -&gt; AsyncIterator[Tag] 

3. Примеры использования

Базовый пример

import asyncio  async def main():     html = """                           <div class="content">                 <p>Текст 1</p>                 <p>Текст 2</p>             </div>                   """          async for tag in parallel_parse(html, tag='p'):         print(tag.text)  asyncio.run(main()) 

Поиск с атрибутами

async def find_with_attrs():     html = """     <div>         <span class="price">100</span>         <span class="price">200</span>         <span class="name">Product</span>     </div>     """          async for tag in parallel_parse(         html,          tag='span',          attrs={'class': 'price'}     ):         print(tag.text) 

Поиск вложенных тегов

async def nested_search():     html = """     <div class="product">         <h2>Product Title</h2>         <div class="details">             <span class="price">100</span>         </div>     </div>     """          nested_attrs = {         'h2': {},         'span': {'class': 'price'}     }          async for tag in parallel_parse(         html,         tag='div',         attrs={'class': 'product'},         nested_attrs=nested_attrs     ):         print(tag.text) 

4. Оптимизация производительности

Настройка количества процессов

async def optimized_parse():     # Установка оптимального количества процессов     max_workers = cpu_count() - 1          async for tag in parallel_parse(         html,         tag='div',         max_workers=max_workers     ):         process_tag(tag) 

Управление таймаутом

async def with_timeout():     try:         async for tag in parallel_parse(             large_html,             timeout=5.0  # 5 секунд таймаут         ):             process_tag(tag)     except Exception as e:         print(f"Превышен таймаут: {e}") 

5. Обработка ошибок

Обработка исключений

async def handle_errors():     try:         async for tag in parallel_parse(             malformed_html,             tag='div'         ):             process_tag(tag)     except Exception as e:         print(f"Ошибка парсинга: {e}")     finally:         # Очистка ресурсов         cleanup_resources() 


ссылка на оригинал статьи https://habr.com/ru/articles/854378/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *