Как я написал алгоритмического бота на Python для торговли по индикаторам на Bybit

от автора

Привет, Хабр!

Я Python-разработчик с уклоном в алготрейдинг. В этой статье я расскажу, как написал простого торгового бота под Bybit, который торгует по классической стратегии на Bollinger Bands. Покажу базовую архитектуру, работу с API, индикаторы и открытие сделок. Подойдёт тем, кто только начинает работать с биржами и хочет потрогать живого бота руками.


📌 Что такое Bollinger Bands?

Полосы Боллинджера — это технический индикатор, основанный на скользящей средней и стандартном отклонении. По сути:

  • Середина (middle): простая скользящая средняя (обычно 20 периодов).

  • Верхняя полоса (upper): SMA + 2 * std.

  • Нижняя полоса (lower): SMA — 2 * std.

Идея: покупать при касании нижней полосы и продавать при касании верхней. Также нужно добавить индикаторы для повышения точности сделок, так как работа только с BB даст плохой результат.


🔍 Другие полезные индикаторы

Для более точной работы лучше использовать также доп индикаторы. Я использовал следущие:

1. RSI (Relative Strength Index)

  • Моментум-индикатор, показывает перекупленность/перепроданность.

  • RSI < 30 — перепроданность (лонг), RSI > 70 — перекупленность (шорт)

  • Отлично работает в паре с Bollinger Bands

2. EMA (Exponential Moving Average)

  • Быстрая реакция на изменения цены

  • Используется как фильтр тренда: цена выше EMA → приоритет лонг

3. ATR (Average True Range)

  • Измеряет волатильность

  • Используется для адаптивного стоп-лосса или фильтрации низковолатильных участков

4. CSI / CSC (Cluster Strength Index / Cluster Signal Confirmation)

  • Индикаторы, основанные на кластеризации свечей и плотности сигналов

  • Используются для подтверждения намерения цены двигаться в выбранном направлении

  • Применяются в авторских стратегиях с фильтрами

🛠️ Что будем использовать

  • requests — для доступа к данным.

  • pybit — работа с API Bybit.

  • pandas, numpy — расчёты индикатора.

  • datetime, time — для контроля времени.

  • threading — для многопоточности.

  • telebot (опционально) — для уведомлений в Telegram.

📁 Структура проекта

bollinger_bot/ ├── main.py              # Основная логика ├── back.py              # Тестирование стратегии на исторических данных ├── config.cfg           # конфиг файл с найстроками (опционально)

Для начала нам необходимо создать конфиг. Это можно сделать как отдельным файлом, так и просто вставить в код. Для простоты пока выберем второй вариант. Вот лучший конфиг, который я нашел за время бектестов:

import pandas as pd import numpy as np import time import datetime from collections import deque from scipy.stats import zscore from binance.client import Client as BinanceClient from pybit.unified_trading import HTTP import telebot # === НАСТРОЙКИ ===  symbol = "ETHUSDT" interval = "5m"  bb_period = 40 bb_std = 1  STOP_LOSS_PCT = 0.004  client = BinanceClient()  config = {     'min_cluster': 3,     'bull_quant': 0.75,     'bear_quant': 0.25,     'rsi': 60 }

📈 Получение исторических данных

Сначала подключим pybit и получим свечи:

def fetch_klines_paged(symbol=symbol, interval=interval, total_bars=100000, client=None):     if client is None:         client = Client()      limit = 1000     data = []     end_time = int(time.time() * 1000)      while len(data) < total_bars:         bars_to_fetch = min(limit, total_bars - len(data))         try:             klines = client.futures_klines(symbol=symbol, interval=interval, limit=bars_to_fetch, endTime=end_time)         except Exception as e:             print("Ошибка Binance API:", e)             break          if not klines:             break          data = klines + data         end_time = klines[0][0] - 1         time.sleep(0.2)      df = pd.DataFrame(data, columns=[         'timestamp', 'open', 'high', 'low', 'close', 'volume',         'close_time', 'quote_asset_volume', 'number_of_trades',         'taker_buy_base', 'taker_buy_quote', 'ignore'     ])     df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')     df[['open','high','low','close','volume']] = df[['open','high','low','close','volume']].astype(float)     df = df.drop_duplicates('timestamp').sort_values('timestamp').reset_index(drop=True)     return df

С помощью клиента bybit мы можем получить множество свечей из истории. Однако при вызове функции есть лимит в 1500 свечей. Поэтому необходимо будет внедрить функцию, способную подгрузить больше, для бектеста.

Хранилищем для свечей у нас будет выступать переменная df. Позже для расчёта индикаторов также придется использовать доп. функцию и будем вырезать из нее до 50000 свечей, для корректного расчёта кластеров. У свечей есть следующие аргументы:

  1. timestamp — время открытия свечи

  2. open — открытие свечи

  3. high — максимум свечи

  4. low — минимум свечи

  5. close — закрытие свечи

  6. volume — обьём, в данном коде он нам не нужен

Расчёт необходимых индикаторов

все технические особенности и математику индикаторов можно найти в открытым доступе. Для их описания можно написать большую отдельную статью, пока что остановимся на сухом коде. Из важных деталей можно сказать, что я использовал

  1. period = 450 в расчёте RSI. Этот период показал наилучший результат с поправкой на ATR. Без него целесообразно использовать меньшие цифры.

  2. В бектесте я рассчитываю индикаторы по всей длине бектеста (в моём случае 100000 свечей по 5м). Но в реальном боте, чтобы результат был аналогичен тесту, необходимо тоже подгружать исторические свечи, желательно тоже 50000-100000 шт.

  3. В ATR.rolling() использую аргумент 14. Это сглаживание, 14 — классическое значение, тут оно мне вполне подходит.

def compute_rsi(df, period=450):     delta = df['close'].diff()     gain = delta.clip(lower=0)     loss = -delta.clip(upper=0)     avg_gain = gain.rolling(period, min_periods=1).mean()     avg_loss = loss.rolling(period, min_periods=1).mean()     rs = avg_gain / avg_loss     df['RSI'] = 100 - (100 / (1 + rs))     df['RSI'] = df['RSI'].fillna(method='bfill')     return df  def compute_csc(df, min_cluster, bull_quant, bear_quant):     bull_thr = df['CSI'].quantile(bull_quant)     bear_thr = df['CSI'].quantile(bear_quant)      df['sentiment'] = np.where(df['CSI'] >= bull_thr, 'bull',                         np.where(df['CSI'] <= bear_thr, 'bear', 'neutral'))     df['cluster_id'] = pd.Series(dtype='object')     curr_type, curr_start, length = None, None, 0      for i, s in df['sentiment'].items():         if s == curr_type and s in ['bull', 'bear']:             length += 1         else:             if curr_type in ['bull', 'bear'] and length >= min_cluster:                 df.loc[curr_start:i-1, 'cluster_id'] = f"{curr_type}_{curr_start}"             if s in ['bull', 'bear']:                 curr_type, curr_start, length = s, i, 1             else:                 curr_type, length = None, 0      if curr_type in ['bull', 'bear'] and length >= min_cluster:         df.loc[curr_start:df.index[-1], 'cluster_id'] = f"{curr_type}_{curr_start}"      return df  def compute_bollinger(df):     df['ma'] = df['close'].rolling(bb_period).mean()     df['std'] = df['close'].rolling(bb_period).std()     df['upper'] = df['ma'] + bb_std * df['std']     df['lower'] = df['ma'] - bb_std * df['std']     return df  def get_csi(df):     body = (df['close'] - df['open']).abs()     rng = (df['high'] - df['low']).replace(0, np.nan)     body_ratio = body / rng     direction = np.where(df['close'] > df['open'], 1, -1)     vol_score = df['volume'] / df['volume'].rolling(50).max()     range_z = zscore(df['high'] - df['low']).clip(-3, 3)      tr = pd.DataFrame({         'hl': df['high'] - df['low'],         'hc': (df['high'] - df['close'].shift(1)).abs(),         'lc': (df['low'] - df['close'].shift(1)).abs()     }).max(axis=1)      atr = tr.rolling(14).mean().bfill()     df['CSI'] = direction * (0.5 * body_ratio + 0.3 * vol_score + 0.2 * range_z) / atr     return df

Добавим функцию с условием входа

Уже ранее мы писали конфиг для бота. Конфиг используется как для расчёта индикаторов, так и для расчёта условия входа. Мой бот будет работать в две стороны — и в лонг, и в шорт. Важно сказать, что на практике формации в лонг отрабатывают на 10-15% чаще, что связанно с особенностью математики рынка (грубо говоря, падать тяжелее, чем расти).

def check_signal_row(row, prev_row):     if np.isnan(row['lower']) or np.isnan(prev_row['CSI']) or np.isnan(row['CSI']):         return None     cluster = row['cluster_id']     if not isinstance(cluster, str):         return None      long_cond = (         row['close'] < row['lower'] and         row['CSI'] > 0 and row['CSI'] > prev_row['CSI'] and         cluster.startswith('bull') and row['RSI'] < config['rsi']     )     short_cond = (         row['close'] > row['upper'] and         row['CSI'] < 0 and row['CSI'] < prev_row['CSI'] and         cluster.startswith('bear') and row['RSI'] > (100 - config['rsi'])     )      if long_cond:         return 'buy'     elif short_cond:         return 'sell'     return None

Теперь, после того как мы написали функцию проверки сигнала, можно подготовить логирование и протестировать наш код. Добавим переменные с сделками, где сработал сигнал, а также полный файл со всеми свечами в csv файл:

if __name__ == '__main__':     df = fetch_klines_paged(symbol, interval, 100000, client)     df = compute_rsi(df)     df = compute_bollinger(df)     df = get_csi(df)     df = compute_csc(df, config['min_cluster'], config['bull_quant'], config['bear_quant'])      signals = [None]     for i in range(1, len(df)):         signals.append(check_signal_row(df.iloc[i], df.iloc[i - 1]))     df['signal'] = signals           in_position = False     entry_price = None     entry_index = None     position_type = None      completed_trades = []      for i in range(1, len(df)):         row = df.iloc[i]         signal = row['signal']          # === Вход в позицию ===         if not in_position and signal in ['buy', 'sell']:             in_position = True             entry_index = i             entry_price = row['close']             position_type = 'long' if signal == 'buy' else 'short'             stop_price = (                 entry_price * (1 - STOP_LOSS_PCT) if position_type == 'long'                 else entry_price * (1 + STOP_LOSS_PCT)             )          # === Выход из позиции ===         elif in_position:             exit_index = entry_index + 15             exit_row = df.iloc[i]             low, high = exit_row['low'], exit_row['high']             hit_stop = (                 low <= stop_price if position_type == 'long'                 else high >= stop_price             )              if hit_stop or i >= exit_index:                 exit_price = stop_price if hit_stop else exit_row['close']                 pnl = (                     (exit_price - entry_price) / entry_price * 100                     if position_type == 'long'                     else (entry_price - exit_price) / entry_price * 100                 )                 completed_trades.append({                     'entry_time': df.iloc[entry_index]['timestamp'],                     'exit_time': df.iloc[i]['timestamp'],                     'position_type': position_type,                     'entry_price': entry_price,                     'exit_price': exit_price,                     'pnl_%': pnl,                     'reason': 'stop_loss' if hit_stop else 'time_exit'                 })                 in_position = False      # === Сохраняем сделки ===     trades_df = pd.DataFrame(completed_trades)     trades_df.to_csv('trades_complete.csv', sep=';', index=False)      print("Последние сделки:")     print(trades_df.tail(10))      total_pnl = trades_df['pnl_%'].sum()     print(f"\nОбщий PnL по стратегии: {total_pnl:.2f}%") 

Для удобства бектеста добавил в конце вывод общего результата по стратегии.

На этом мы закончили с проведением бектеста. Бектест — всегда очень важная часть написанного вами бота. Бектесты можно проводить разными методами, многие опытные программисты реализуют бектест с отрисовкой позиций на графике. Но это усложненный вариант, и, по-моему мнению это не стоит того.

Приступим к написанию основного бота

Логика абсолютно целиком сохраняется. Расчёт индикаторов аналогичный. Я добавил для удобства отправление сообщений в телеграмм бота, чтобы следить.

Очень важное отличие — нужно две функции подгрузки свечей. Одна для свечей в реальном времени, вторая для исторических данных (нужно для расчёта индикатора csc).

Также необходимо добавить в конфиг:

# === НАСТРОЙКИ === symbol = "ETHUSDT" interval = "5m" bb_period = 40 bb_std = 1 STOP_LOSS_PCT = 0.004 TRADE_QTY_ETH = 1 EXIT_AFTER_BARS = 3 #15 минут TELEGRAM_CHAT_ID = config.tg_id #свой chat_id  # === API === bot = telebot.TeleBot("7871321841:AAGp9cbyLRCdO0VMfqK0v-x2eGCfIqebmVU") #tg bot BYBIT_API_KEY = config.apikey BYBIT_API_SECRET = config.bybit_secret  bybit = HTTP(api_key=BYBIT_API_KEY, api_secret=BYBIT_API_SECRET) #bybit init  # === КЛАСТЕРНЫЙ КОНФИГ === config = {     'min_cluster': 3,     'bull_quant': 0.75,     'bear_quant': 0.25,     'rsi': 60,     'total_bars': 60000 }  entry_history = deque(maxlen=100) open_positions = [] client = BinanceClient()

Теперь реализуем подгрузку свечей:

def fetch_klines_paged(symbol=symbol, interval=interval,  total_bars=60000, client = None):     if client is None:         client = BinanceClient()      limit = 1000     data = []     end_time = None  # самый последний бар (новейшая точка)      while len(data) < total_bars:         bars_to_fetch = min(limit, total_bars - len(data))          try:             klines = client.futures_klines(                 symbol=symbol,                 interval=interval,                 limit=bars_to_fetch,                 endTime=end_time             )         except Exception as e:             print("Ошибка Binance API:", e)             break          if not klines:             break          data = klines + data  # prepend! — старые свечи добавляем в начало         end_time = klines[0][0] - 1  # сдвиг назад по времени         time.sleep(0.2)      df = pd.DataFrame(data, columns=[         'timestamp', 'open', 'high', 'low', 'close', 'volume',         'close_time', 'quote_asset_volume', 'number_of_trades',         'taker_buy_base', 'taker_buy_quote', 'ignore'     ])     df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')     df[['open','high','low','close','volume']] = df[['open','high','low','close','volume']].astype(float)     df = df.drop_duplicates('timestamp')     df = df.sort_values('timestamp').reset_index(drop=True)     return df  def get_last_closed_candle():     df = fetch_klines_paged(total_bars=50000)     last_candle = df.iloc[-2]  # предпоследняя — она закрыта     now = datetime.datetime.now(datetime.timezone.utc)     if (now - last_candle['timestamp'].to_pydatetime()).total_seconds() >= 300:         return last_candle.to_frame().T     else:         print("⏳ Свеча ещё не закрыта. Пропускаем.")         return None  

Для упрощения я сделал функцию единой. Но в идеале добавлять функцию которая будет подгружать последние 100-200 свечей и также работать с ней. Это сказывается лишь на ресурсозатратности.

Также добавил def get_last_closed_candle(), чтобы отлеживать последнюю свечу и сигнал на ней. Можно было внедрять потоки, либо ещё хуже — time.sleep(), однако такой подход меня не устроил на дистанции. По опыту это работает гораздо хуже.

И теперь важный ньюанс, о котором сказал ранее. Необходимо добавить sub = df[], чтобы при расчёте индикатора кластеров (который работает на большом отрезке свечей) не возникло расхождения с бектестом. В других индикаторах это не требуется, так как в них и так используется <1000 свечей (максимум в rsi, 450).

def compute_csc(df):     sub = df.tail(min(50000, len(df)))     bull_thr = sub['CSI'].quantile(config['bull_quant'])     bear_thr = sub['CSI'].quantile(config['bear_quant'])     df['sentiment'] = np.where(df['CSI'] >= bull_thr, 'bull', np.where(df['CSI'] <= bear_thr, 'bear', 'neutral'))     df['cluster_id'] = pd.Series(dtype='object')     curr_type, curr_start, length = None, None, 0     for i, s in df['sentiment'].items():         if s == curr_type and s in ['bull','bear']:             length += 1         else:             if curr_type in ['bull','bear'] and length >= config['min_cluster']:                 df.loc[curr_start:i-1, 'cluster_id'] = f"{curr_type}_{curr_start}"             if s in ['bull','bear']:                 curr_type, curr_start, length = s, i, 1             else:                 curr_type, length = None, 0     if curr_type in ['bull','bear'] and length >= config['min_cluster']:         df.loc[curr_start:df.index[-1], 'cluster_id'] = f"{curr_type}_{curr_start}"     return df

Теперь создадим функции входа в позицию. В первую очередь это открытие позиции с помощью pybit. Также сделал функцию, чтобы бот точно не открывал несколько позиций в одной 5м свече.

def place_order(symbol, side, qty_eth, stop_price):     try:         bybit.place_order(             category="linear",             symbol=symbol,             side="Buy" if side == "long" else "Sell",             order_type="Market",             qty=qty_eth,             time_in_force="GoodTillCancel",             stopLoss=round(stop_price, 2)         )         bot.send_message(TELEGRAM_CHAT_ID, f"✅ Открыта {side.upper()} позиция на {qty_eth} ETH")     except Exception as e:         print("Ошибка ордера:", e)  def close_position(symbol, position_type, qty_eth):     try:         bybit.place_order(             category="linear",             symbol=symbol,             side="Sell" if position_type == "long" else "Buy",             order_type="Market",             qty=qty_eth,             time_in_force="GoodTillCancel"         )         bot.send_message(TELEGRAM_CHAT_ID, f"🔻 Закрыта {position_type.upper()} позиция ({qty_eth} ETH)")     except Exception as e:         bot.send_message(TELEGRAM_CHAT_ID, f"❗ Ошибка при закрытии позиции: {e}")  def can_enter_again(signal_type):     now = datetime.datetime.now(datetime.timezone.utc)     cooldown = 5 * 60     return not any((now - t).total_seconds() < cooldown and s == signal_type for t, s in entry_history)

Ну и добавим основную функцию с вызовом функций расчёта индикаторов:

bot.send_message(TELEGRAM_CHAT_ID, "📈 Бот запущен") df = fetch_klines_paged() last_checked_minute = None  while True:     try:         now = datetime.datetime.now(datetime.timezone.utc)         if now.minute % 5 == 0 and now.second < 10:             if last_checked_minute == now.minute:                 time.sleep(1)                 continue             last_checked_minute = now.minute              new_df = get_last_closed_candle()             if new_df is None:                 continue              df = pd.concat([df, new_df.tail(1)]).drop_duplicates('timestamp').reset_index(drop=True)             if len(df) > config['total_bars']:                 df = df.tail(config['total_bars'])              df = compute_bollinger(df)             df = get_csi(df)             df = compute_csc(df)             df = compute_rsi(df)             df['signal'] = [None] + [check_signal_row(df.iloc[i], df.iloc[i - 1]) for i in range(1, len(df))]              latest = df.iloc[-2]             signal = latest['signal']              if signal in ['buy', 'sell'] and can_enter_again(signal):                 entry_price = latest['close']                 stop_price = entry_price * (1 - STOP_LOSS_PCT) if signal == 'buy' else entry_price * (1 + STOP_LOSS_PCT)                 position_type = 'long' if signal == 'buy' else 'short'                 entry_time = datetime.datetime.now(datetime.timezone.utc)                  place_order(symbol, position_type, TRADE_QTY_ETH, stop_price)                 entry_history.append((entry_time, signal))                 open_positions.append({                     'type': position_type,                     'entry_price': entry_price,                     'stop_price': stop_price,                     'entry_time': entry_time                 })             positions_to_remove = []             current_price = latest['close']             updated_positions = []             for pos in open_positions[:]:                 entry_time = pos['entry_time']                 elapsed = (datetime.utcnow() - entry_time).total_seconds()                 position_data = bybit.get_positions(category="linear", symbol=symbol)["result"]["list"]                 position_size = float(position_data[0]['size']) if position_data else 0                  # Проверка на срабатывание стоп-лосса                 hit_stop = (                     (pos['type'] == 'long' and current_price <= pos['stop_price']) or                     (pos['type'] == 'short' and current_price >= pos['stop_price'])                 )                  if hit_stop or elapsed >= (EXIT_AFTER_BARS * 5):                     # Проверка: позиция ещё существует на бирже                     if position_size > 0:                         exit_price = pos['stop_price'] if hit_stop else current_price                         pnl = (                             (exit_price - pos['entry_price']) / pos['entry_price'] * 100                             if pos['type'] == 'long'                             else (pos['entry_price'] - exit_price) / pos['entry_price'] * 100                         )                         reason = "стоп-лосс" if hit_stop else "по времени"                         close_position(symbol, pos['type'], TRADE_QTY_ETH)                         bot.send_message(                             TELEGRAM_CHAT_ID,                             f"❌ Закрытие позиции: {pos['type'].upper()} по {exit_price:.2f} ({reason})\nPnL: {pnl:.2f}%"                         )                     else:                         # Позиция уже закрыта вручную/стопом вне кода                         bot.send_message(                             TELEGRAM_CHAT_ID,                             f"ℹ️ Позиция {pos['type'].upper()} уже закрыта на бирже. Удаляю из списка."                         )                      positions_to_remove.append(pos)              # Удаление обработанных/закрытых позиций             for p in positions_to_remove:                 if p in open_positions:                     open_positions.remove(p)      except Exception as e:         bot.send_message(TELEGRAM_CHAT_ID, f"❗ Ошибка: {e}")      time.sleep(3) 

Теперь по итогу имеем два файла. Main.py и back.py.

Main.py файл запустит основного бота. Перед этим желательно провести бектесты и изучив вывод консоли и .csv таблицу убедиться в том, что винрейт достаточный. Например, мой бот торгует по фьючам и процент чистой прибыли составил 173.86% за примерно год (100к свечей). То есть это 1738% прибыли на маржу. Но, конечно, стоит учитывать и комиссионные + проскальзывания. Комиссия байбит — 0,1% на маржу (0,01% на позицию). Также заклывал проскальзывание около 0,02%. Так что прибыль бота немного меньше. Однако если считать % на маржу, то вложив 1000$ и используя стандартном неизменяемое (на unified trading классическом аккаунте) плечо 10х, можно за год хорошо преумножить эту сумму.


ссылка на оригинал статьи https://habr.com/ru/articles/934602/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *