Привет, Хабр!
Я Python-разработчик с уклоном в алготрейдинг. В этой статье я расскажу, как написал простого торгового бота под Bybit, который торгует по классической стратегии на Bollinger Bands. Покажу базовую архитектуру, работу с API, индикаторы и открытие сделок. Подойдёт тем, кто только начинает работать с биржами и хочет потрогать живого бота руками.
📌 Что такое Bollinger Bands?
Полосы Боллинджера — это технический индикатор, основанный на скользящей средней и стандартном отклонении. По сути:
-
Середина (middle): простая скользящая средняя (обычно 20 периодов).
-
Верхняя полоса (upper): SMA + 2 * std.
-
Нижняя полоса (lower): SMA — 2 * std.
Идея: покупать при касании нижней полосы и продавать при касании верхней. Также нужно добавить индикаторы для повышения точности сделок, так как работа только с BB даст плохой результат.
🔍 Другие полезные индикаторы
Для более точной работы лучше использовать также доп индикаторы. Я использовал следущие:
1. RSI (Relative Strength Index)
-
Моментум-индикатор, показывает перекупленность/перепроданность.
-
RSI < 30 — перепроданность (лонг), RSI > 70 — перекупленность (шорт)
-
Отлично работает в паре с Bollinger Bands
2. EMA (Exponential Moving Average)
-
Быстрая реакция на изменения цены
-
Используется как фильтр тренда: цена выше EMA → приоритет лонг
3. ATR (Average True Range)
-
Измеряет волатильность
-
Используется для адаптивного стоп-лосса или фильтрации низковолатильных участков
4. CSI / CSC (Cluster Strength Index / Cluster Signal Confirmation)
-
Индикаторы, основанные на кластеризации свечей и плотности сигналов
-
Используются для подтверждения намерения цены двигаться в выбранном направлении
-
Применяются в авторских стратегиях с фильтрами
🛠️ Что будем использовать
-
requests— для доступа к данным. -
pybit— работа с API Bybit. -
pandas,numpy— расчёты индикатора. -
datetime,time— для контроля времени. -
threading— для многопоточности. -
telebot(опционально) — для уведомлений в Telegram.
📁 Структура проекта
bollinger_bot/ ├── main.py # Основная логика ├── back.py # Тестирование стратегии на исторических данных ├── config.cfg # конфиг файл с найстроками (опционально)
Для начала нам необходимо создать конфиг. Это можно сделать как отдельным файлом, так и просто вставить в код. Для простоты пока выберем второй вариант. Вот лучший конфиг, который я нашел за время бектестов:
import pandas as pd import numpy as np import time import datetime from collections import deque from scipy.stats import zscore from binance.client import Client as BinanceClient from pybit.unified_trading import HTTP import telebot # === НАСТРОЙКИ === symbol = "ETHUSDT" interval = "5m" bb_period = 40 bb_std = 1 STOP_LOSS_PCT = 0.004 client = BinanceClient() config = { 'min_cluster': 3, 'bull_quant': 0.75, 'bear_quant': 0.25, 'rsi': 60 }
📈 Получение исторических данных
Сначала подключим pybit и получим свечи:
def fetch_klines_paged(symbol=symbol, interval=interval, total_bars=100000, client=None): if client is None: client = Client() limit = 1000 data = [] end_time = int(time.time() * 1000) while len(data) < total_bars: bars_to_fetch = min(limit, total_bars - len(data)) try: klines = client.futures_klines(symbol=symbol, interval=interval, limit=bars_to_fetch, endTime=end_time) except Exception as e: print("Ошибка Binance API:", e) break if not klines: break data = klines + data end_time = klines[0][0] - 1 time.sleep(0.2) df = pd.DataFrame(data, columns=[ 'timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_asset_volume', 'number_of_trades', 'taker_buy_base', 'taker_buy_quote', 'ignore' ]) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') df[['open','high','low','close','volume']] = df[['open','high','low','close','volume']].astype(float) df = df.drop_duplicates('timestamp').sort_values('timestamp').reset_index(drop=True) return df
С помощью клиента bybit мы можем получить множество свечей из истории. Однако при вызове функции есть лимит в 1500 свечей. Поэтому необходимо будет внедрить функцию, способную подгрузить больше, для бектеста.
Хранилищем для свечей у нас будет выступать переменная df. Позже для расчёта индикаторов также придется использовать доп. функцию и будем вырезать из нее до 50000 свечей, для корректного расчёта кластеров. У свечей есть следующие аргументы:
-
timestamp — время открытия свечи
-
open — открытие свечи
-
high — максимум свечи
-
low — минимум свечи
-
close — закрытие свечи
-
volume — обьём, в данном коде он нам не нужен
Расчёт необходимых индикаторов
все технические особенности и математику индикаторов можно найти в открытым доступе. Для их описания можно написать большую отдельную статью, пока что остановимся на сухом коде. Из важных деталей можно сказать, что я использовал
-
period = 450 в расчёте RSI. Этот период показал наилучший результат с поправкой на ATR. Без него целесообразно использовать меньшие цифры.
-
В бектесте я рассчитываю индикаторы по всей длине бектеста (в моём случае 100000 свечей по 5м). Но в реальном боте, чтобы результат был аналогичен тесту, необходимо тоже подгружать исторические свечи, желательно тоже 50000-100000 шт.
-
В ATR.rolling() использую аргумент 14. Это сглаживание, 14 — классическое значение, тут оно мне вполне подходит.
def compute_rsi(df, period=450): delta = df['close'].diff() gain = delta.clip(lower=0) loss = -delta.clip(upper=0) avg_gain = gain.rolling(period, min_periods=1).mean() avg_loss = loss.rolling(period, min_periods=1).mean() rs = avg_gain / avg_loss df['RSI'] = 100 - (100 / (1 + rs)) df['RSI'] = df['RSI'].fillna(method='bfill') return df def compute_csc(df, min_cluster, bull_quant, bear_quant): bull_thr = df['CSI'].quantile(bull_quant) bear_thr = df['CSI'].quantile(bear_quant) df['sentiment'] = np.where(df['CSI'] >= bull_thr, 'bull', np.where(df['CSI'] <= bear_thr, 'bear', 'neutral')) df['cluster_id'] = pd.Series(dtype='object') curr_type, curr_start, length = None, None, 0 for i, s in df['sentiment'].items(): if s == curr_type and s in ['bull', 'bear']: length += 1 else: if curr_type in ['bull', 'bear'] and length >= min_cluster: df.loc[curr_start:i-1, 'cluster_id'] = f"{curr_type}_{curr_start}" if s in ['bull', 'bear']: curr_type, curr_start, length = s, i, 1 else: curr_type, length = None, 0 if curr_type in ['bull', 'bear'] and length >= min_cluster: df.loc[curr_start:df.index[-1], 'cluster_id'] = f"{curr_type}_{curr_start}" return df def compute_bollinger(df): df['ma'] = df['close'].rolling(bb_period).mean() df['std'] = df['close'].rolling(bb_period).std() df['upper'] = df['ma'] + bb_std * df['std'] df['lower'] = df['ma'] - bb_std * df['std'] return df def get_csi(df): body = (df['close'] - df['open']).abs() rng = (df['high'] - df['low']).replace(0, np.nan) body_ratio = body / rng direction = np.where(df['close'] > df['open'], 1, -1) vol_score = df['volume'] / df['volume'].rolling(50).max() range_z = zscore(df['high'] - df['low']).clip(-3, 3) tr = pd.DataFrame({ 'hl': df['high'] - df['low'], 'hc': (df['high'] - df['close'].shift(1)).abs(), 'lc': (df['low'] - df['close'].shift(1)).abs() }).max(axis=1) atr = tr.rolling(14).mean().bfill() df['CSI'] = direction * (0.5 * body_ratio + 0.3 * vol_score + 0.2 * range_z) / atr return df
Добавим функцию с условием входа
Уже ранее мы писали конфиг для бота. Конфиг используется как для расчёта индикаторов, так и для расчёта условия входа. Мой бот будет работать в две стороны — и в лонг, и в шорт. Важно сказать, что на практике формации в лонг отрабатывают на 10-15% чаще, что связанно с особенностью математики рынка (грубо говоря, падать тяжелее, чем расти).
def check_signal_row(row, prev_row): if np.isnan(row['lower']) or np.isnan(prev_row['CSI']) or np.isnan(row['CSI']): return None cluster = row['cluster_id'] if not isinstance(cluster, str): return None long_cond = ( row['close'] < row['lower'] and row['CSI'] > 0 and row['CSI'] > prev_row['CSI'] and cluster.startswith('bull') and row['RSI'] < config['rsi'] ) short_cond = ( row['close'] > row['upper'] and row['CSI'] < 0 and row['CSI'] < prev_row['CSI'] and cluster.startswith('bear') and row['RSI'] > (100 - config['rsi']) ) if long_cond: return 'buy' elif short_cond: return 'sell' return None
Теперь, после того как мы написали функцию проверки сигнала, можно подготовить логирование и протестировать наш код. Добавим переменные с сделками, где сработал сигнал, а также полный файл со всеми свечами в csv файл:
if __name__ == '__main__': df = fetch_klines_paged(symbol, interval, 100000, client) df = compute_rsi(df) df = compute_bollinger(df) df = get_csi(df) df = compute_csc(df, config['min_cluster'], config['bull_quant'], config['bear_quant']) signals = [None] for i in range(1, len(df)): signals.append(check_signal_row(df.iloc[i], df.iloc[i - 1])) df['signal'] = signals in_position = False entry_price = None entry_index = None position_type = None completed_trades = [] for i in range(1, len(df)): row = df.iloc[i] signal = row['signal'] # === Вход в позицию === if not in_position and signal in ['buy', 'sell']: in_position = True entry_index = i entry_price = row['close'] position_type = 'long' if signal == 'buy' else 'short' stop_price = ( entry_price * (1 - STOP_LOSS_PCT) if position_type == 'long' else entry_price * (1 + STOP_LOSS_PCT) ) # === Выход из позиции === elif in_position: exit_index = entry_index + 15 exit_row = df.iloc[i] low, high = exit_row['low'], exit_row['high'] hit_stop = ( low <= stop_price if position_type == 'long' else high >= stop_price ) if hit_stop or i >= exit_index: exit_price = stop_price if hit_stop else exit_row['close'] pnl = ( (exit_price - entry_price) / entry_price * 100 if position_type == 'long' else (entry_price - exit_price) / entry_price * 100 ) completed_trades.append({ 'entry_time': df.iloc[entry_index]['timestamp'], 'exit_time': df.iloc[i]['timestamp'], 'position_type': position_type, 'entry_price': entry_price, 'exit_price': exit_price, 'pnl_%': pnl, 'reason': 'stop_loss' if hit_stop else 'time_exit' }) in_position = False # === Сохраняем сделки === trades_df = pd.DataFrame(completed_trades) trades_df.to_csv('trades_complete.csv', sep=';', index=False) print("Последние сделки:") print(trades_df.tail(10)) total_pnl = trades_df['pnl_%'].sum() print(f"\nОбщий PnL по стратегии: {total_pnl:.2f}%")
Для удобства бектеста добавил в конце вывод общего результата по стратегии.
На этом мы закончили с проведением бектеста. Бектест — всегда очень важная часть написанного вами бота. Бектесты можно проводить разными методами, многие опытные программисты реализуют бектест с отрисовкой позиций на графике. Но это усложненный вариант, и, по-моему мнению это не стоит того.
Приступим к написанию основного бота
Логика абсолютно целиком сохраняется. Расчёт индикаторов аналогичный. Я добавил для удобства отправление сообщений в телеграмм бота, чтобы следить.
Очень важное отличие — нужно две функции подгрузки свечей. Одна для свечей в реальном времени, вторая для исторических данных (нужно для расчёта индикатора csc).
Также необходимо добавить в конфиг:
# === НАСТРОЙКИ === symbol = "ETHUSDT" interval = "5m" bb_period = 40 bb_std = 1 STOP_LOSS_PCT = 0.004 TRADE_QTY_ETH = 1 EXIT_AFTER_BARS = 3 #15 минут TELEGRAM_CHAT_ID = config.tg_id #свой chat_id # === API === bot = telebot.TeleBot("7871321841:AAGp9cbyLRCdO0VMfqK0v-x2eGCfIqebmVU") #tg bot BYBIT_API_KEY = config.apikey BYBIT_API_SECRET = config.bybit_secret bybit = HTTP(api_key=BYBIT_API_KEY, api_secret=BYBIT_API_SECRET) #bybit init # === КЛАСТЕРНЫЙ КОНФИГ === config = { 'min_cluster': 3, 'bull_quant': 0.75, 'bear_quant': 0.25, 'rsi': 60, 'total_bars': 60000 } entry_history = deque(maxlen=100) open_positions = [] client = BinanceClient()
Теперь реализуем подгрузку свечей:
def fetch_klines_paged(symbol=symbol, interval=interval, total_bars=60000, client = None): if client is None: client = BinanceClient() limit = 1000 data = [] end_time = None # самый последний бар (новейшая точка) while len(data) < total_bars: bars_to_fetch = min(limit, total_bars - len(data)) try: klines = client.futures_klines( symbol=symbol, interval=interval, limit=bars_to_fetch, endTime=end_time ) except Exception as e: print("Ошибка Binance API:", e) break if not klines: break data = klines + data # prepend! — старые свечи добавляем в начало end_time = klines[0][0] - 1 # сдвиг назад по времени time.sleep(0.2) df = pd.DataFrame(data, columns=[ 'timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_asset_volume', 'number_of_trades', 'taker_buy_base', 'taker_buy_quote', 'ignore' ]) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') df[['open','high','low','close','volume']] = df[['open','high','low','close','volume']].astype(float) df = df.drop_duplicates('timestamp') df = df.sort_values('timestamp').reset_index(drop=True) return df def get_last_closed_candle(): df = fetch_klines_paged(total_bars=50000) last_candle = df.iloc[-2] # предпоследняя — она закрыта now = datetime.datetime.now(datetime.timezone.utc) if (now - last_candle['timestamp'].to_pydatetime()).total_seconds() >= 300: return last_candle.to_frame().T else: print("⏳ Свеча ещё не закрыта. Пропускаем.") return None
Для упрощения я сделал функцию единой. Но в идеале добавлять функцию которая будет подгружать последние 100-200 свечей и также работать с ней. Это сказывается лишь на ресурсозатратности.
Также добавил def get_last_closed_candle(), чтобы отлеживать последнюю свечу и сигнал на ней. Можно было внедрять потоки, либо ещё хуже — time.sleep(), однако такой подход меня не устроил на дистанции. По опыту это работает гораздо хуже.
И теперь важный ньюанс, о котором сказал ранее. Необходимо добавить sub = df[], чтобы при расчёте индикатора кластеров (который работает на большом отрезке свечей) не возникло расхождения с бектестом. В других индикаторах это не требуется, так как в них и так используется <1000 свечей (максимум в rsi, 450).
def compute_csc(df): sub = df.tail(min(50000, len(df))) bull_thr = sub['CSI'].quantile(config['bull_quant']) bear_thr = sub['CSI'].quantile(config['bear_quant']) df['sentiment'] = np.where(df['CSI'] >= bull_thr, 'bull', np.where(df['CSI'] <= bear_thr, 'bear', 'neutral')) df['cluster_id'] = pd.Series(dtype='object') curr_type, curr_start, length = None, None, 0 for i, s in df['sentiment'].items(): if s == curr_type and s in ['bull','bear']: length += 1 else: if curr_type in ['bull','bear'] and length >= config['min_cluster']: df.loc[curr_start:i-1, 'cluster_id'] = f"{curr_type}_{curr_start}" if s in ['bull','bear']: curr_type, curr_start, length = s, i, 1 else: curr_type, length = None, 0 if curr_type in ['bull','bear'] and length >= config['min_cluster']: df.loc[curr_start:df.index[-1], 'cluster_id'] = f"{curr_type}_{curr_start}" return df
Теперь создадим функции входа в позицию. В первую очередь это открытие позиции с помощью pybit. Также сделал функцию, чтобы бот точно не открывал несколько позиций в одной 5м свече.
def place_order(symbol, side, qty_eth, stop_price): try: bybit.place_order( category="linear", symbol=symbol, side="Buy" if side == "long" else "Sell", order_type="Market", qty=qty_eth, time_in_force="GoodTillCancel", stopLoss=round(stop_price, 2) ) bot.send_message(TELEGRAM_CHAT_ID, f"✅ Открыта {side.upper()} позиция на {qty_eth} ETH") except Exception as e: print("Ошибка ордера:", e) def close_position(symbol, position_type, qty_eth): try: bybit.place_order( category="linear", symbol=symbol, side="Sell" if position_type == "long" else "Buy", order_type="Market", qty=qty_eth, time_in_force="GoodTillCancel" ) bot.send_message(TELEGRAM_CHAT_ID, f"🔻 Закрыта {position_type.upper()} позиция ({qty_eth} ETH)") except Exception as e: bot.send_message(TELEGRAM_CHAT_ID, f"❗ Ошибка при закрытии позиции: {e}") def can_enter_again(signal_type): now = datetime.datetime.now(datetime.timezone.utc) cooldown = 5 * 60 return not any((now - t).total_seconds() < cooldown and s == signal_type for t, s in entry_history)
Ну и добавим основную функцию с вызовом функций расчёта индикаторов:
bot.send_message(TELEGRAM_CHAT_ID, "📈 Бот запущен") df = fetch_klines_paged() last_checked_minute = None while True: try: now = datetime.datetime.now(datetime.timezone.utc) if now.minute % 5 == 0 and now.second < 10: if last_checked_minute == now.minute: time.sleep(1) continue last_checked_minute = now.minute new_df = get_last_closed_candle() if new_df is None: continue df = pd.concat([df, new_df.tail(1)]).drop_duplicates('timestamp').reset_index(drop=True) if len(df) > config['total_bars']: df = df.tail(config['total_bars']) df = compute_bollinger(df) df = get_csi(df) df = compute_csc(df) df = compute_rsi(df) df['signal'] = [None] + [check_signal_row(df.iloc[i], df.iloc[i - 1]) for i in range(1, len(df))] latest = df.iloc[-2] signal = latest['signal'] if signal in ['buy', 'sell'] and can_enter_again(signal): entry_price = latest['close'] stop_price = entry_price * (1 - STOP_LOSS_PCT) if signal == 'buy' else entry_price * (1 + STOP_LOSS_PCT) position_type = 'long' if signal == 'buy' else 'short' entry_time = datetime.datetime.now(datetime.timezone.utc) place_order(symbol, position_type, TRADE_QTY_ETH, stop_price) entry_history.append((entry_time, signal)) open_positions.append({ 'type': position_type, 'entry_price': entry_price, 'stop_price': stop_price, 'entry_time': entry_time }) positions_to_remove = [] current_price = latest['close'] updated_positions = [] for pos in open_positions[:]: entry_time = pos['entry_time'] elapsed = (datetime.utcnow() - entry_time).total_seconds() position_data = bybit.get_positions(category="linear", symbol=symbol)["result"]["list"] position_size = float(position_data[0]['size']) if position_data else 0 # Проверка на срабатывание стоп-лосса hit_stop = ( (pos['type'] == 'long' and current_price <= pos['stop_price']) or (pos['type'] == 'short' and current_price >= pos['stop_price']) ) if hit_stop or elapsed >= (EXIT_AFTER_BARS * 5): # Проверка: позиция ещё существует на бирже if position_size > 0: exit_price = pos['stop_price'] if hit_stop else current_price pnl = ( (exit_price - pos['entry_price']) / pos['entry_price'] * 100 if pos['type'] == 'long' else (pos['entry_price'] - exit_price) / pos['entry_price'] * 100 ) reason = "стоп-лосс" if hit_stop else "по времени" close_position(symbol, pos['type'], TRADE_QTY_ETH) bot.send_message( TELEGRAM_CHAT_ID, f"❌ Закрытие позиции: {pos['type'].upper()} по {exit_price:.2f} ({reason})\nPnL: {pnl:.2f}%" ) else: # Позиция уже закрыта вручную/стопом вне кода bot.send_message( TELEGRAM_CHAT_ID, f"ℹ️ Позиция {pos['type'].upper()} уже закрыта на бирже. Удаляю из списка." ) positions_to_remove.append(pos) # Удаление обработанных/закрытых позиций for p in positions_to_remove: if p in open_positions: open_positions.remove(p) except Exception as e: bot.send_message(TELEGRAM_CHAT_ID, f"❗ Ошибка: {e}") time.sleep(3)
Теперь по итогу имеем два файла. Main.py и back.py.
Main.py файл запустит основного бота. Перед этим желательно провести бектесты и изучив вывод консоли и .csv таблицу убедиться в том, что винрейт достаточный. Например, мой бот торгует по фьючам и процент чистой прибыли составил 173.86% за примерно год (100к свечей). То есть это 1738% прибыли на маржу. Но, конечно, стоит учитывать и комиссионные + проскальзывания. Комиссия байбит — 0,1% на маржу (0,01% на позицию). Также заклывал проскальзывание около 0,02%. Так что прибыль бота немного меньше. Однако если считать % на маржу, то вложив 1000$ и используя стандартном неизменяемое (на unified trading классическом аккаунте) плечо 10х, можно за год хорошо преумножить эту сумму.
ссылка на оригинал статьи https://habr.com/ru/articles/934602/
Добавить комментарий