Как я научил торгового бота рисовать свечные графики и перестал спамить текстом

от автора

Привет, Хабр! Меня зовут Николай Пискунов, я руководитель направления Big Data и эксперт курса Cloud DevSecOps по безопасной разработке от Академии вАЙТИ Beeline Cloud. Сегодня расскажу о разработке системы, которая строит свечные графики для трейдинг-бота на Python. Это полноценный инструмент анализа, который помогает принимать торговые решения в реальном времени. Важная часть этой системы — быстрая связь с пользователем через бота в Телеграме. 

Проблема и решение: как превратить текстовый спам в полезное сообщение 

Исходная система была простой: WebSocket-клиент получал тики, детектор паттернов находил «поглощение» или «стохастик» и отправлял сообщение. Вот как это выглядело:

📈 СИГНАЛ: EUR/USD OTC (15s) – BUY

Полезной информации — ноль. Пользователь не видел:

  • графика перед сигналом (был ли тренд?);

  • формы свечи, которая дала сигнал;

  • уровня входа относительно ближайших максимумов/минимумов.

Нужно было сделать так, чтобы бот:

  1. Автоматически рисовал график при обнаружении сигнала.

  2. Делал это быстро (менее 300 мс), чтобы не тормозить основной поток анализа.

  3. Отправлял график в Телеграм и сразу удалял его с сервера, чтобы не забивать место.

  4. Работал для любых таймфреймов — от 15 секунд до 1 дня.

Звучит как отдельный микросервис, но на деле всё решается грамотной архитектурой в рамках одного приложения. Поехали!

Шаг 1. Архитектурная особенность: два кеша, чтобы не смешивать анализ и визуализацию

Первая и самая важная архитектурная ошибка, которую я совершил в первой версии и хотел исправить в этой, — хранить все свечи в одном месте. Анализу нужны только свежие данные (последние 20–50 свечей), а для графика — больше истории (до 500 свечей), и она может быть старой.

Смешивать их — значит, засорять память и замедлять анализ. Решение: два отдельных кеша внутри главного класса монитора.

python class AdvancedMonitor:def __init__(self, config: Dict):     # ... инициализация других компонентов ...     # Кеш для АНАЛИЗА: только свежие свечи. Храним по правилам из конфига.     self.analysis_candles = {}     # Кеш для ГРАФИКОВ: максимум истории. Храним до 500 свечей.     self.chart_candles = {}     self.max_chart_candles = 500 async def _on_tik_received(self, tik: Dict):     # ... обработка тика и создание свечи (aggregated_candle) ...     # Дальше — магия разделения     key = f"{pair}_{timeframe}"     # 1. Сохраняем ВСЁ в кеш для графиков     if key not in self.chart_candles:         self.chart_candles[key] = []     self.chart_candles[key].append(aggregated_candle)     # Ограничиваем длину для экономии памяти     if len(self.chart_candles[key]) > self.max_chart_candles:         self.chart_candles[key] = self.chart_candles[key][-self.max_chart_candles:]      # 2. Сохраняем только СВЕЖЕЕ в кеш для анализа     current_time = datetime.now().timestamp()     candle_age = current_time - aggregated_candle.get('timestamp', 0)     timeframe_max_age = self.get_max_candle_age_for_timeframe(timeframe) # 45 сек. для 15s      if candle_age <= timeframe_max_age:         if key not in self.analysis_candles:             self.analysis_candles[key] = []         self.analysis_candles[key].append(aggregated_candle)         # Ограничиваем длину для анализа (из конфига)         max_store = self.get_max_candles_to_store(timeframe) # 400 для 15s         if len(self.analysis_candles[key]) > max_store:             self.analysis_candles[key] = self.analysis_candles[key][-max_store:]

Зачем так сложно?

Анализ теперь работает с небольшим, очень релевантным набором данных, что повышает скорость проверки паттернов.

Визуализация имеет доступ к более глубокой истории, чтобы нарисовать красивый график с контекстом. Они не мешают друг другу.

Шаг 2. Агрегатор в реальном времени переводит тики в свечи

На вход мы получаем от WebSocket API поток сырых тиков (цен). Нам нужно самим собирать из них свечи. Для этого я написал класс RealTimeCandleAggregator. Он получает на вход каждый тик, определяет, к какой свече по времени он относится, и обновляет ее OHLCV (Open, High, Low, Close, Volume).

Важный нюанс: таймфрейм может быть секундным (15s, 30s), поэтому нужно аккуратно округлять время.

pythonclass RealTimeCandleAggregator:# ... инициализация ...def add_tick(self, pair: str, timeframe: str, tick: Dict) -> Optional[Dict]:     tick_time = datetime.fromtimestamp(tick.get('timestamp', 0))     # Получаем время начала свечи для этого тика     candle_start = self._get_candle_start_time(tick_time, timeframe)     # Ключ для хранения тиков текущей свечи     key = f"{pair}_{timeframe}"     # Если началась новая свеча, закрываем старую и создаем новую     if key not in self.current_candle_start or candle_start > self.current_candle_start[key]:         # Формируем свечу из накопленных тиков         finished_candle = self._create_candle_from_ticks(key, pair, timeframe)         # Сбрасываем буфер для новой свечи         self.current_candle_start[key] = candle_start         self.ticks_storage[key] = deque()         self.ticks_storage[key].append(tick)          if finished_candle:             return finished_candle # Возвращаем ЗАКОНЧИВШУЮСЯ свечу         else:             return None     # Добавляем тик в текущую свечу     self.ticks_storage[key].append(tick)     return None # Свеча еще не закрытаdef _get_candle_start_time(self, tick_time: datetime, timeframe: str) -> datetime:     seconds = self.timeframe_seconds.get(timeframe, 60)     if seconds < 60:         # Для секундных таймфреймов         seconds_floor = (tick_time.second // seconds) * seconds         return tick_time.replace(second=seconds_floor, microsecond=0)     else:         # Для минутных и часовых         # ... логика округления минут ...         pass

 Как это работает в связке:

  1. В WebSocket-клиенте на каждый входящий тик вызывается ontik_received.

  2. Этот метод передает тик в candle_aggregator.add_tick().

  3. Если агрегатор возвращает свечу (значит, старый интервал закончился), мы отправляем ее в обработку.

Шаг 3. Сердце визуализации — класс CandleChartMaker

Когда агрегатор возвращает свечу, она попадает в оба кеша (для анализа и графиков). А когда детектор паттернов находит сигнал, нам нужно создать график. Вся логика по работе с matplotlib и mplfinance вынесена в отдельный класс CandleChartMaker.

Почему mplfinance? Это надстройка над matplotlib, специально заточенная под финансовые графики. Она из коробки умеет рисовать японские свечи, настраивать цвета для бычьих/медвежьих свечей, добавлять объемы и делать это красиво.

Вот упрощенный, но полностью рабочий код этого класса, который лежит в основе статьи:

pythonimport loggingfrom datetime import datetimefrom pathlib import Pathfrom typing import Dict, List, Optional import pandas as pdimport mplfinance as mpfimport matplotlib.pyplot as plt logger = logging.getLogger(__name__) class CandleChartMaker:"""Генератор красивых свечных графиков для Телеграма.""" def __init__(self, plot_dir: str = "logs/tmp/plots"):     self.plot_dir = Path(plot_dir)     self.plot_dir.mkdir(parents=True, exist_ok=True) def create_chart_from_cache(self, pair: str, timeframe: str, candles: List[Dict],                             signal_type: str = None, signal_price: float = None) -> Optional[str]:     """     Основной метод для создания графика.     """     if not candles or len(candles) < 5:         logger.warning(f"Недостаточно данных для графика {pair}")         return None     # 1. Превращаем список свечей в DataFrame для mplfinance     df = self._prepare_dataframe(candles)     if len(df) < 2:         return None     # 2. Генерируем уникальное имя файла     timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3]     safe_pair = pair.replace('/', '_').replace(' ', '_')     filename = f"{safe_pair}_{timeframe}_{signal_type}_{timestamp}.png"     plot_path = self.plot_dir / filename     # 3. Пытаемся нарисовать график через mplfinance     success = self._create_mplfinance_chart(df, pair, timeframe, plot_path, signal_type, signal_price)      if success:         logger.info(f" График готов: {plot_path}")         return str(plot_path)     else:         logger.error(f" Не удалось создать график для {pair}")         return None def _prepare_dataframe(self, candles: List[Dict]) -> pd.DataFrame:     """Конвертирует список свечей в DataFrame, готовый для mplfinance."""     data = []     for c in candles:         # Убеждаемся, что все ключи есть, и конвертируем время         if not all(k in c for k in ['datetime', 'open', 'high', 'low', 'close']):             continue         try:             dt = pd.to_datetime(c['datetime'])             data.append({                 'datetime': dt,                 'open': float(c['open']),                 'high': float(c['high']),                 'low': float(c['low']),                 'close': float(c['close']),                 'volume': float(c.get('volume', 0)),             })         except (ValueError, TypeError) as e:             logger.debug(f"Ошибка парсинга свечи: {e}")             continue      if not data:         return pd.DataFrame()      df = pd.DataFrame(data)     df = df.sort_values('datetime')     # mplfinance требует, чтобы индексом был datetime     df.set_index('datetime', inplace=True)     return df def _create_mplfinance_chart(self, df: pd.DataFrame, pair: str, timeframe: str,                               output_path: Path, signal_type: str = None,                               signal_price: float = None) -> bool:     """Внутренний метод для рисования с помощью mplfinance."""     try:         # Настройка цветовой схемы         mc = mpf.make_marketcolors(             up='#26a69a', # Зеленый для бычьих             down='#ef5350',  # Красный для медвежьих             wick='inherit',             volume='in',             edge='inherit'         )         s = mpf.make_mpf_style(marketcolors=mc, gridstyle='--', y_on_right=False)         # Будем добавлять дополнительные линии на график         addplots = []         # 1. Линия цены входа (сигнала)         if signal_price is not None:             signal_line = [signal_price] * len(df)             color = 'green' if signal_type and 'BUY' in signal_type.upper() else 'red'             label = f"Вход: {signal_price:.5f}"             ap = mpf.make_addplot(signal_line, color=color, linestyle='--', width=1, label=label)             addplots.append(ap)         # 2. Скользящие средние (если хватает данных)         if len(df) >= 10:             ma10 = df['close'].rolling(10).mean()             ap_ma10 = mpf.make_addplot(ma10, color='orange', width=0.8, label='MA10')             addplots.append(ap_ma10)         if len(df) >= 20:             ma20 = df['close'].rolling(20).mean()             ap_ma20 = mpf.make_addplot(ma20, color='blue', width=0.8, label='MA20')             addplots.append(ap_ma20)         # Формируем заголовок         title = f"{pair} ({timeframe})"         if signal_type:             title += f" — {signal_type} Сигнал"         # Создаем фигуру. Параметр `returnfig=True` дает нам доступ к объекту Figure         fig, axes = mpf.plot(             df,             type='candle',             style=s,             title=title,             ylabel='Цена',             volume=True,  # Показываем объем             addplot=addplots,             figsize=(12, 7),             panel_ratios=(3, 1),  # Соотношение графика свечей и объема             returnfig=True,             tight_layout=True         )         # Добавляем немного статистики в правый верхний угол         self._add_stats_text(fig, df, pair)         # Сохраняем         fig.savefig(output_path, dpi=120, bbox_inches='tight')         plt.close(fig)         return True      except Exception as e:         logger.error(f"Ошибка mplfinance: {e}", exc_info=True)         return False def _add_stats_text(self, fig, df: pd.DataFrame, pair: str):     """Добавляет текстовую статистику на график."""     last_candle = df.iloc[-1]     price_change = last_candle['close'] - df.iloc[0]['open']     price_change_pct = (price_change / df.iloc[0]['open']) * 100     # Считаем бычьи/медвежьи свечи на графике     bullish = (df['close'] >= df['open']).sum()     bearish = len(df) - bullish      stats_text = (         f" {pair}n"         f"Close: {last_candle['close']:.5f}n"         f"Change: {price_change:+.5f} ({price_change_pct:+.2f}%)n"         f"Bull/Bear: {bullish}/{bearish}"     )     # Размещаем текст в координатах фигуры     fig.text(0.83, 0.85, stats_text, fontsize=8,              bbox=dict(boxstyle="round,pad=0.3", facecolor="white", alpha=0.8)) def cleanup_old_plots(self, max_age_hours: int = 1):     """Удаляет старые графики, чтобы не забивать диск."""     # ... логика удаления файлов ...     pass

Шаг 4. Интеграция с Телеграмом

Последний шаг — самый простой и приятный. Когда сигнал обнаружен и график создан, вызываем метод нашего TelegramBotRunner (который внутри использует aiogram), передаем ему путь к картинке и данные сигнала.

pythonasync def _send_signal_to_subscribers(self, signal: Dict, candles_data: List[Dict] = None):if not self.telegram_manager:     returntry:     # 1. Формируем данные для сообщения     signal_data = {         'pair': signal['pair'],         'direction': 'buy' if signal['direction'] == 'bullish' else 'sell',         'confidence': signal['confidence'],         'timeframe': signal['timeframe'],         'price': signal['current_price'],         'pattern': signal['pattern'],         'strategy': signal.get('strategy', 'unknown'),         'expiry_minutes': signal.get('expiry_minutes', 2) # Время экспирации     }     # 2. СОЗДАЕМ ГРАФИК, используя данные из chart_candles!     chart_path = None     # Берем последние 50 свечей из кеша для графиков     chart_candles = self.get_candles_for_chart(signal['pair'], signal['timeframe'], 50)     if chart_candles:         try:             signal_type = 'BUY' if signal['direction'] == 'bullish' else 'SELL'             chart_path = self.chart_maker.create_chart_from_cache(                 pair=signal['pair'],                 timeframe=signal['timeframe'],                 candles=chart_candles,                 signal_type=signal_type,                 signal_price=float(signal['current_price'])             )         except Exception as e:             logger.error(f"Ошибка генерации графика: {e}")     # 3. Отправляем в Телеграм     await self.telegram_manager.send_signal(signal_data, chart_path)     # 4. Удаляем временный файл графика (менеджер сделает это сам)     # ...except Exception as e:     logger.error(f"Ошибка отправки сигнала: {e}")

Результат: что видит пользователь

Вот что в итоге получает пользователь в Телеграме. Вместо скучной строчки — полноценный анализ.

🔴 ТОРГОВЫЙ СИГНАЛ 🔴

Пара: EUR/RUB OTC

Стратегия: Engulfing

Направление: SELL

Таймфрейм: 15s

Экспирация: 2 мин.

Цена: 85.14383

Паттерн: engulfing

Уверенность: 🟢 HIGH

Именно это и нужно трейдеру для принятия решения.

Подводные камни и их решение

В процессе разработки я столкнулся с несколькими проблемами, о которых стоит упомянуть:

  1. Дубликаты данных. При переподключении к WebSocket или при загрузке истории могли приходить уже обработанные тики. Это приводило к искажению свечей. Решение — дедупликатор, который хранит ключи (пара + таймфрейм + время + цена) и отсеивает повторы.

  2. Сбой mplfinance. Библиотека отличная, но, как и любой код, иногда падает с неочевидными ошибками. На этот случай у меня был план Б — резервный метод createquick_chart, который рисовал примитивный график через чистый matplotlib. Система должна быть отказоустойчивой.

  3. Утечка памяти. Если генерировать по 1000 графиков в день и не удалять их, диск быстро забьется. Метод cleanup_old_plots решает эту проблему, оставляя графики жить не более часа.

Заключение

В результате получилась самодостаточная, надежная система визуализации — не просто придаток, а полноценная часть торгового робота. Результаты, на мой взгляд, хорошие, вот какие метрики я получил на реальных данных:

  • Время создания графика: 100–300 мс.

  • Размер файла PNG: 50–100 КБ.

  • Память на 1000 графиков: ~100 МБ.

  • Поддерживаемые пары: 50+ одновременно.

Beeline Cloud — безопасный облачный провайдер. Разрабатываем облачные решения, чтобы вы предоставляли клиентам лучшие сервисы.

ссылка на оригинал статьи https://habr.com/ru/articles/1026056/