Бэктестинг торговых стратегий на Python с помощью Numba. Когда перевод расчетов на GPU действительно оправдан?

от автора

Введение

Бэктестинг — ключевой процесс в алгоритмической торговле. Он позволяет проверить стратегию на исторических данных, прежде чем запускать её в реальной торговле. Однако, чем больше данных и сложнее логика стратегии, тем дольше времени занимают вычисления. Особенно если стратегия анализирует тиковые данные и требуется протестировать множество комбинаций гиперпараметров стратегии, время вычислений может расти экспоненциально.

В этой статье мы разберем, как реализовать бэктестинг на чистом Python, посмотрим сколько времени могут занимать вычисления, а также попробуем найти разные способы оптимизации.

Python, как известно — это интерпретируемый язык, что означает, что код выполняется построчно во время исполнения программы, а не компилируется в машинный код заранее, как это происходит, например, в C или C++. Это делает разработку быстрее и удобнее, так как можно сразу видеть результаты выполнения кода и легко отлаживать программы. Но этот же факт, в свою очередь, приводит к тому, что Python заметно уступает в скорости более низкоуровневым языкам. К тому же Python использует динамическую типизацию, что требует дополнительных проверок и снижает производительность и если данных очень много, это может приводить к значительным сложностям, связанным с увеличением времени вычислений.

Как же использовать ту легкость и скорость разработки Python и при этом сохранить адекватное время вычислений на больших объемах данных? В этой статье мы увидим, насколько перенос вычислений на GPU может увеличить производительность вычислений.

Описание стратегии

В качестве примера мы возьмём некоторую придуманную нами стратегию, под названием сетка ордеров. Эта стратегия заключается в том, чтобы выставлять ордера на покупку при снижении цены и продавать активы при их росте. Стратегия использует фиксированный шаг (grid_spread), который указывает, на какой процент должен измениться курс, прежде чем трейдер совершит покупку или продажу. Сразу оговорюсь, что в рамках этой статьи мы не привязываемся к конкретной стратегии, а просто используем ее в качестве демонстрационного примера. На самом же деле же стратегия может быть любой.

Итак, коротко о том, как работает стратегия:

  • Основные параметры:

    • grid_spread — расстояние между уровнями покупки и продажи.

    • position_size — объем позиции, на который мы совершаем сделку.

  1. Начало торговли:

    Стратегия начинается с открытия двух покупок актива. Эти покупки выполняются по начальной цене, чтобы создать начальную позицию

  2. Покупка при падении цены:

    Если цена актива падает на заданный процент (grid_spread) от последней покупки, совершается новая покупка. Это позволяет усреднить цену покупки и накапливать актив на более выгодных уровнях.

  3. Продажа при росте цены:

    Когда цена актива растет и достигает уровня grid_spread от последней покупки, происходит продажа. Это позволяет зафиксировать прибыль, полученную за счет роста цены.

  4. При росте цены оставляем минимум одну позицию:

    Если цена постоянно идет в нашу сторону, то всегда оставляем минимум одну открытую позицию, следующая покупка должна состояться при отклонении цены на два уровня (grid_spread) вниз от ее локального максимума.

  5. Учет комиссий за сделки:

    Каждая сделка включает комиссию биржи (trading_fee). Этот фактор должен учитываться при вычислениях стоимости сделок, а также в расчетах баланса и прибыли.

Тестирование

Мы проведем бэктестинг, варьируя параметры grid_spread и position_size, чтобы определить, какие комбинации могли привести к наибольшей прибыли или наименьшему риску. Задача — не просто протестировать стратегию, но и найти её оптимальные параметры.

Определим диапазоны значений для grid_spread и position_size, которые будем тестировать:

import numpy as np  grid_spreads = np.arange(0.01, 0.26, 0.01)  # Сетка от 1% до 25% с шагом 1% position_sizes = np.arange(10, 310, 10)  # Размер позиции от 10 до 300 USDT с шагом 10 USDT

Нетрудно посчитать, что у нас будет 750 разных комбинаций grid_spreads и position_size, т.е. 750 разных трейдеров.

Скрытый текст

(количество вариантов grid_spreads) х (количество вариантов position_sizes) = 25х30 = 750 комбинаций

В качестве примера тестовых данных ценовых котировок возьмем исторические данные о цене биткоина за 2024 год, загруженных с API Binance. API Binance позволяет загружать исторические данные ценовых котировок разного формата, мы же для проверки нашей стратегии будем использовать данные агрегированных сделок (Aggregate Trades) Aggregate Trades — торговые операции, которые представляют собой сгруппированные сделки, произошедшие в определённый момент времени, где несколько индивидуальных сделок произошедших по одной цене объединяются в одну запись.

Скрытый текст

Мы не будем углубляться в подробности выбора именно агрегированных сделок для анализа торговых стратегий, но стоит отметить, что информация с обычных свечных графиков часто недостаточна для полного понимания хронологии изменений цен. Агрегированные сделки предоставляют более точную картину, так как они сохраняют полную хронологию изменений цен, а сделки с одинаковыми ценами группируются в одну, что позволяет существенно сократить объем обрабатываемых данных по сравнению с тиковыми данными, делая анализ более эффективным.

Загрузив данные мы получим Pandas датафрейм df_aggtrades размером 551+ млн. строк с информацией о ценах о объемах сделок за конкретные метки времени:

price

quantity

timestamp

2024-01-01 00:00:00.000

42283.578125

0.00069

2024-01-01 00:00:00.001

42283.589844

0.00144

2024-01-01 00:00:00.003

42283.578125

0.00102

2024-01-01 00:00:00.003

42283.589844

0.00219

2024-01-01 00:00:00.004

42283.589844

0.00163

2024-12-31 23:59:58.241

93576.007812

0.00060

2024-12-31 23:59:58.250

93576.007812

0.00023

2024-12-31 23:59:59.067

93576.000000

0.00173

2024-12-31 23:59:59.950

93576.000000

0.00022

2024-12-31 23:59:59.951

93576.000000

0.00022

551044283 rows × 2 columns

Python

Попробуем, для начала, реализовать код тестирования стратегии на чистом Python:

class GridTrading:     """     Класс для реализации стратегии Grid Trading.          Стратегия заключается в поэтапной покупке актива при падении цены и продаже при росте     с фиксированным шагом (`grid_spread`).     """          def __init__(self, initial_timestamp, initial_price, initial_balance=1000, trading_fee=0.001):         """         Инициализирует стратегию Grid Trading.                  :param initial_timestamp: Начальный временной штамп.         :param initial_price: Начальная цена актива.         :param initial_balance: Начальный баланс в USDT (по умолчанию 1000).         :param trading_fee: Комиссия за сделку (по умолчанию 0.001, то есть 0.1%).         """         self.initial_timestamp = initial_timestamp # Временная метка начала торговли         self.initial_price = initial_price # Цена актива в момент старта стратегии         self.grid_spread = None  # Устанавливается в update_strategy         self.position_size = None  # Устанавливается в update_strategy         self.tot_comission = 0  # Общая сумма комиссий         self.count_positions = 0  # Количество открытых позиций         self.balance = initial_balance  # Баланс в USDT         self.coin_amount = 0  # Количество приобретенных монет         self.positions = []  # Открытые позиции (timestamp, price, amount)         self.trading_history = []  # История всех операций         self.loc_max_price = None  # Локальный максимум цены         self.trading_fee = trading_fee  # Комиссия за торговлю          def update_strategy(self, grid_spread, position_size):         """         Обновляет параметры стратегии.                  :param grid_spread: Шаг сетки (процентное расстояние между уровнями покупки/продажи).         :param position_size: Размер позиции для каждой сделки.         """         self.grid_spread = grid_spread         self.position_size = position_size          # Инициализация первых двух покупок         for _ in range(2):             self.buy_position(self.initial_timestamp, self.initial_price)              def update_trading(self, timestamp, current_price):         """         Обновляет состояние стратегии в зависимости от текущей цены.                  :param timestamp: Текущий временной штамп.         :param current_price: Текущая цена актива.         """         if self.count_positions == 1 and (self.loc_max_price is None or current_price > self.loc_max_price):             self.loc_max_price = current_price             return                  if self.count_positions == 1 and current_price <= self.loc_max_price * (1 - 2 * self.grid_spread):             self.buy_position(timestamp, current_price)             self.loc_max_price = None         elif self.count_positions > 1 and current_price <= self.positions[-1][1] * (1 - self.grid_spread):             self.buy_position(timestamp, current_price)         elif current_price >= self.positions[-1][1] * (1 + self.grid_spread):             self.sell_position(timestamp, current_price)      def buy_position(self, timestamp, price):         """         Совершает покупку актива.                  :param timestamp: Временной штамп сделки.         :param price: Цена покупки.         """         comission = self.position_size * self.trading_fee         cost = self.position_size + comission          if self.balance >= cost:             btc_bought = self.position_size / price             self.count_positions += 1             self.balance -= cost             self.tot_comission += comission              self.coin_amount += btc_bought             self.positions.append((timestamp, price, btc_bought))             self.trading_history.append({                 'Action': 'BUY',                 'Time': timestamp,                 'Price': price,                 'Amount': btc_bought,                 'Balance USDT': self.balance,                 'Total_balance': self.balance + self.coin_amount * price,                 'Total BTC amount': self.coin_amount             })          def sell_position(self, timestamp, price):         """         Совершает продажу актива.                  :param timestamp: Временной штамп сделки.         :param price: Цена продажи.         """         if self.count_positions > 1:             _, buy_price, btc_sold = self.positions.pop()  # Продаем по цене последней покупки             comission = btc_sold * price * self.trading_fee             revenue = btc_sold * price - comission             self.tot_comission += comission             self.count_positions -= 1             self.balance += revenue             self.coin_amount -= btc_sold             self.loc_max_price = None             self.trading_history.append({                 'Action': 'SELL',                 'Time': timestamp,                 'Price': price,                 'Amount': btc_sold,                 'Balance USDT': self.balance,                 'Total_balance': self.balance + self.coin_amount * price,                 'Total BTC amount': self.coin_amount             }) 

Код запуска симуляций будет иметь вид:

def run_trading_simulation(df, grid_spreads, position_sizes):     """     Запускает симуляцию торговли с использованием стратегии Grid Trading.      Параметры:     - df (pandas.DataFrame): DataFrame с историческими данными, содержащий столбец 'price' для цен и индекс для временных меток.     - grid_spreads (np.ndarray): Массив с возможными значениями спредов для сетки.     - position_sizes (np.ndarray): Массив с возможными размерами позиций для торговли.      Возвращает:     - traders (list): Список объектов трейдеров, которые прошли через симуляцию торговли.     """     prices = df['price'].values  # Извлечение цен из DataFrame     timestamps = df.index.values  # Извлечение временных меток     traders = []  # Список для хранения объектов трейдеров          # Проходим по всем возможным комбинациям спредов и размеров позиций     for grid_spread in grid_spreads:         for position_size in position_sizes:             # Инициализация трейдера с начальной ценой и временной меткой             trader = GridTrading(timestamps[0], prices[0])             trader.update_strategy(grid_spread, position_size)  # Обновление стратегии для текущего трейдера                          # Перебор всех цен и обновление торговых решений             for i in range(len(prices)):                 price = prices[i]  # Извлекаем текущую цену                 timestamp = timestamps[i]  # Извлекаем соответствующий временной штамп                 trader.update_trading(timestamp, price)  # Обновление торгового решения для текущей цены              traders.append(trader)  # Добавляем трейдера в список      return traders  # Возвращаем список трейдеров, прошедших через симуляцию 

Все вычисления мы будем проводить на платформе Kaggle. Это облачная среда для анализа данных и машинного обучения, предоставляющая удобные инструменты для работы с кодом. Если у вас нет мощного железа или GPU, Kaggle позволяет бесплатно использовать облачные GPU (например, Tesla T4 или P100) для ускорения вычислений.

Запустим вычисления сначала для одного трейдера и замерим среднее время выполнения кода

%timeit -r 3 -n 1 trader = run_trading_simulation(df_aggtrades, grid_spreads[:1], position_sizes[:1])  11min 21s ± 1.36 s per loop (mean ± std. dev. of 3 runs, 1 loop each)

Видим, что работает это не сильно быстро. Расчет даже для одного трейдера занимает в среднем больше 11 минут. А у нас таких трейдеров (комбинаций grid_spread и position_size) 750 шт. Нетрудно посчитать, что, если запустим вычисления в таком виде, выполняться они будут около 6 дней!

Надо явно что-то с этим делать…

Numba

Что если вынести весь проход по датафрейму для поиска ключевых точек (точек покупки и продажи) в отдельную функцию и скомпилировать её? Это позволит ускорить выполнение кода в разы! Для этого мы воспользуемся библиотекой Numba, которая с помощью JIT-компиляции (Just-In-Time) преобразует Python-код в машинный код прямо во время выполнения, значительно повышая его производительность.

Преимущества Numba:

✔ Многократное ускорение без необходимости переписывать код на C/C++.
✔ Простота использования — достаточно добавить декоратор @njit.
✔ Эффективная оптимизация циклов и числовых вычислений.

Если вас интересуют детали работы Numba и методы её оптимизации, то с этим можно ознакомиться в документации Numba или с материалами на Habr-е здесь, здесь и здесь. Мы же сразу перейдём к практике.

Поскольку ключевых торговых точек в стратегии будет гораздо меньше, чем общее количество записей в датафрейме, то в дальнейшем мы сможем достаточно быстро обработать их обычным Python-циклом, формируя историю сделок для последующего анализа.

Итак, модифицированный код выглядит следующим образом:

from numba import njit from numba.typed import List  @njit(inline='always') def buy_position(i, current_price, balance, positions, trade_points_indices, trade_points_actions, trade_points_prices, position_size, trading_fee, count):     """     Выполняет покупку позиции и обновляет баланс, а также информацию о сделке.      Параметры:     - i (int): Индекс текущей торговой точки.     - current_price (float): Текущая цена актива.     - balance (float): Доступный баланс для выполнения сделки.     - positions (numba.typed.List): Список текущих позиций.     - trade_points_indices (np.ndarray): Массив для хранения индексов торговых точек.     - trade_points_actions (np.ndarray): Массив для хранения действий на торговых точках.     - trade_points_prices (np.ndarray): Массив для хранения цен на торговых точках.     - position_size (float): Размер позиции.     - trading_fee (float): Торговая комиссия.     - count (int): Счётчик числа сделок.      Возвращает:     - Новый баланс после выполнения сделки.     """     trade_points_indices[count] = i     trade_points_actions[count] = 0  # 0 для BUY     trade_points_prices[count] = current_price     amount = position_size / current_price  # Количество актива, которое можно купить     cost = position_size * (1 + trading_fee)  # Стоимость позиции с учётом комиссии     positions.append((current_price, amount))  # Добавление новой позиции     return balance - cost  # Обновление баланса после покупки  @njit(inline='always') def sell_position(i, current_price, balance, positions, trade_points_indices, trade_points_actions, trade_points_prices, position_size, trading_fee, count):     """     Выполняет продажу позиции и обновляет баланс, а также информацию о сделке.      Параметры:     - i (int): Индекс текущей торговой точки.     - current_price (float): Текущая цена актива.     - balance (float): Доступный баланс для выполнения сделки.     - positions (numba.typed.List): Список текущих позиций.     - trade_points_indices (np.ndarray): Массив для хранения индексов торговых точек.     - trade_points_actions (np.ndarray): Массив для хранения действий на торговых точках.     - trade_points_prices (np.ndarray): Массив для хранения цен на торговых точках.     - position_size (float): Размер позиции.     - trading_fee (float): Торговая комиссия.     - count (int): Счётчик числа сделок.      Возвращает:     - Новый баланс после выполнения сделки.     """     trade_points_indices[count] = i     trade_points_actions[count] = 1  # 1 для SELL     trade_points_prices[count] = current_price     buy_price, btc_sold = positions.pop()  # Извлечение информации о купленной позиции     revenue = btc_sold * current_price - position_size * trading_fee  # Выручка от продажи с учётом комиссии     return balance + revenue  # Обновление баланса после продажи  @njit def find_trade_points(prices, grid_spread, position_size, initial_balance=1000, trading_fee=0.001):     """     Находит торговые точки для покупки и продажи на основе стратегии с сеткой.      Параметры:     - prices (np.ndarray): Массив цен актива.     - grid_spread (float): Значение спреда сетки для торговли.     - position_size (float): Размер позиции.     - initial_balance (float): Начальный баланс для торговли.     - trading_fee (float): Торговая комиссия.      Возвращает:     - trade_points_indices (np.ndarray): Индексы торговых точек.     - trade_points_actions (np.ndarray): Действия на торговых точках (BUY/SELL).     - trade_points_prices (np.ndarray): Цены на торговых точках.     """     positions = List()  # Список для хранения позиций     balance = initial_balance  # Инициализация баланса     loc_max_price = -np.inf  # Инициализация максимальной цены     count = 0  # Счётчик сделок     n = len(prices)  # Количество цен (данных)          # Массивы для хранения информации о торговых точках     trade_points_indices = np.empty(n, dtype=np.int32)     trade_points_actions = np.empty(n, dtype=np.int32)     trade_points_prices = np.empty(n, dtype=np.float64)          # Открытие первой позиции (покупка)     for i in range(2):         balance = buy_position(0, prices[0], balance, positions, trade_points_indices, trade_points_actions, trade_points_prices, position_size, trading_fee, count)         count += 1      # Поиск торговых точек для покупки/продажи на основе цены     for i in range(n):         current_price = prices[i]         if len(positions) == 1 and current_price > loc_max_price:             loc_max_price = current_price         elif len(positions) == 1 and current_price <= loc_max_price * (1 - 2 * grid_spread):             cost = position_size * (1 + trading_fee)             if balance >= cost:                 balance = buy_position(i, current_price, balance, positions, trade_points_indices, trade_points_actions, trade_points_prices, position_size, trading_fee, count)                 count += 1                 loc_max_price = -np.inf         elif len(positions) > 1 and current_price <= positions[-1][0] * (1 - grid_spread):             cost = position_size * (1 + trading_fee)             if balance >= cost:                 balance = buy_position(i, current_price, balance, positions, trade_points_indices, trade_points_actions, trade_points_prices, position_size, trading_fee, count)                 count += 1                   elif len(positions) > 1 and current_price >= positions[-1][0] * (1 + grid_spread):             balance = sell_position(i, current_price, balance, positions, trade_points_indices, trade_points_actions, trade_points_prices, position_size, trading_fee, count)             count += 1             loc_max_price = -np.inf          return (trade_points_indices[:count],              trade_points_actions[:count],              trade_points_prices[:count]) 

Скорректируем код класса, описывающего логику одного трейдера, заменив метод update_trading на метод process_trade.

class FastGridTrading:     """     Класс для симуляции торговой стратегии на основе сетки (Grid Trading).     Хранит информацию о позициях, балансе, комиссии и истории сделок для трейдера.     """      def __init__(self, initial_timestamp, initial_price, initial_balance=1000, trading_fee=0.001):         """         Инициализирует объект трейдера с заданными параметрами.          :param initial_timestamp: Начальная метка времени для первой сделки.         :param initial_price: Начальная цена для первой сделки.         :param initial_balance: Начальный баланс (по умолчанию 1000).         :param trading_fee: Торговая комиссия за сделку (по умолчанию 0.001).         """         self.position_size = None  # Размер позиции для каждой сделки         self.grid_spread = None  # Расстояние между уровнями в стратегии Grid         self.tot_comission = 0  # Общая комиссия, уплаченная за сделки         self.count_positions = 0  # Количество открытых позиций         self.balance = initial_balance  # Баланс в валюте (например, USDT)         self.coin_amount = 0  # Количество купленных монет         self.positions = []  # Список открытых позиций         self.trading_history = []  # История сделок (покупка/продажа)         self.loc_max_price = None  # Максимальная цена на текущий момент         self.trading_fee = trading_fee  # Комиссия за сделку      def update_strategy(self, grid_spread, position_size):         """         Обновляет параметры стратегии для торговли.          :param grid_spread: Расстояние между уровнями в стратегии Grid.         :param position_size: Размер позиции для каждой сделки.         """         self.grid_spread = grid_spread         self.position_size = position_size      def process_trade(self, timestamp, action, price):         """         Обрабатывает сделку на основе действия (покупка или продажа).          :param timestamp: Время выполнения сделки.         :param action: Тип действия ('BUY' или 'SELL').         :param price: Цена сделки.         """         if action == 'BUY':             self.buy_position(timestamp, price)  # Покупка         elif action == 'SELL':             self.sell_position(timestamp, price)  # Продажа      def buy_position(self, timestamp, price):         """         Выполняет покупку позиции по заданной цене.          :param timestamp: Время сделки.         :param price: Цена покупки.         """         comission = self.position_size * self.trading_fee         cost = self.position_size + comission  # Стоимость покупки (с учетом комиссии)         if self.balance >= cost:  # Проверка, что достаточно средств             btc_bought = self.position_size / price  # Количество купленных монет             self.count_positions += 1  # Увеличиваем количество открытых позиций             self.balance -= cost  # Уменьшаем баланс на стоимость покупки             self.tot_comission += comission  # Увеличиваем общую комиссию             self.coin_amount += btc_bought  # Увеличиваем количество монет             self.positions.append((timestamp, price, btc_bought))  # Добавляем позицию             self.trading_history.append({                 'Action': 'BUY',                                 'Time': timestamp,                  'Price': price,                  'Amount': btc_bought,                 'Balance USDT': self.balance,                  'Total_balance': self.balance + self.coin_amount * price,                  'Total BTC amount': self.coin_amount             })  # Добавляем информацию о сделке в историю          def sell_position(self, timestamp, price):         """         Выполняет продажу позиции по заданной цене.          :param timestamp: Время сделки.         :param price: Цена продажи.         """         if self.count_positions > 1:  # Продажа возможна только если есть открытые позиции             _, buy_price, btc_sold = self.positions.pop()  # Извлекаем последнюю купленную позицию             comission = btc_sold * price * self.trading_fee             revenue = btc_sold * price - comission  # Выручка от продажи за вычетом комиссии             self.tot_comission += comission  # Увеличиваем общую комиссию             self.count_positions -= 1  # Уменьшаем количество открытых позиций             self.balance += revenue  # Увеличиваем баланс на выручку от продажи             self.coin_amount -= btc_sold  # Уменьшаем количество монет             self.loc_max_price = None  # Сбрасываем максимальную цену             self.trading_history.append({                 'Action': 'SELL',                 'Time': timestamp,                  'Price': price,                  'Amount': btc_sold,                  'Balance USDT': self.balance,                  'Total_balance': self.balance + self.coin_amount * price,                 'Total BTC amount': self.coin_amount             })  # Добавляем информацию о сделке в историю

Код для запуска симуляции примет вид:

def run_trading_simulation_njit(df, grid_spreads, position_sizes):     """     Запускает симуляцию торговли с использованием метода `find_trade_points` для поиска торговых точек.      Параметры:     - df (pandas.DataFrame): DataFrame с историческими данными. Должен содержать столбцы 'close' и 'price' с ценами.     - grid_spreads (np.ndarray): Массив с возможными значениями спредов для сетки.     - position_sizes (np.ndarray): Массив с возможными размерами позиций.          Возвращает:     - traders (list): Список объектов трейдеров, которые прошли через симуляцию.     """     # Выбор цен для симуляции в зависимости от значения флага klines     prices = df['price'].values             timestamps = df.index.values         traders = []      # Цикл по всем возможным значениям спредов и размеров позиций     for grid_spread in grid_spreads:         for position_size in position_sizes:             # Находим торговые точки с помощью функции find_trade_points             indices, actions, trade_prices = find_trade_points(prices, grid_spread, position_size)                          # Инициализация трейдера с начальной ценой и временной меткой             trader = FastGridTrading(timestamps[0], prices[0])             trader.update_strategy(grid_spread, position_size)              # Обработка найденных торговых точек             for idx, action, price in zip(indices, actions, trade_prices):                 if 0 <= idx < len(timestamps):  # Проверка, что индекс в пределах допустимого диапазона                     if action == 0:                         trader.process_trade(timestamps[idx], 'BUY', price)  # Покупка                     elif action == 1:                         trader.process_trade(timestamps[idx], 'SELL', price)  # Продажа              traders.append(trader)  # Добавляем трейдера в список      return traders 

В новой версии кода, представленной в run_trading_simulation_njit, произошли следующие изменения по сравнению с предыдущей версией run_trading_simulation:

  1. Использование функции find_trade_points для определения торговых точек: В новом коде добавлен вызов функции find_trade_points, которая позволяет заранее вычислить индексы, действия (покупка или продажа) и цены для каждой торговой точки. Это значительно упрощает основной цикл симуляции, так как торговые решения уже вычисляются заранее, и трейдеру нужно лишь обработать их по мере прохождения через временные метки.

  2. Замена трейдера GridTrading на FastGridTrading: В новой версии кода для обработки торговых решений используется класс FastGridTrading. Логика принятия торговых решений была вынесена в JIT-компилируемую функцию find_trade_points, а FastGridTrading теперь служит исключительно для инкапсуляции данных о торговле в объект, позволяя более эффективно управлять процессом.

  3. Значительное сокращение количества итераций Python-цикла: Вместо того чтобы для каждой цены вызывать функцию обновления стратегии через update_trading, в новой версии кода для каждого индекса и действия из результатов функции find_trade_points вызывается метод process_trade. Это позволяет значительно сократить число итераций Python-циклов.

Запустим симуляцию и замерим среднее время выполнения так же для одной комбинации значений grid_spreads и position_sizes:

%timeit -r 3 -n 1 trader_njit = run_trading_simulation_njit(df_aggtrades, grid_spreads[:1], position_sizes[:1])  33.1 s ± 28.2 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)

Благодаря JIT-компиляции Numba удалось значительно ускорить выполнение кода по сравнению с обычным Python. В среднем расчет одной комбинации теперь занимает около 33 секунд — это уже ощутимый прирост, но все же недостаточно. При последовательном переборе 750 комбинаций общее время выполнения составит примерно 6.9 часов!

Очевидно, что даже с текущей оптимизацией обработка такого объема данных займет слишком много времени.

Как можно ускорить вычисления еще сильнее? Причем речь идет не о небольшом улучшении, а об оптимизации на порядки.

Numba + GPU

Одним из самых мощных способов ускорения вычислений является перенос их на GPU. В отличие от CPU, который выполняет задачи последовательно или в небольшом количестве потоков, GPU (графический процессор) обладает тысячами ядер, способными выполнять вычисления параллельно. GPU идеально подходит для задач, содержащих большое количество однотипных операций.

Numba позволяет перенести вычисления на GPU практически так же легко. С помощью декоратора @cuda.jit можно писать код для CUDA-ядра прямо на Python, не углубляясь в языки низкого уровня, такие как C++/CUDA.

Это означает, что мы можем переписать наши вычисления так, чтобы они выполнялись параллельно на тысячах потоков GPU, используя почти тот же Python-код, но с небольшими изменениями.

Преимущества Numba для работы с GPU:
✔ Простота – достаточно заменить @njit на @cuda.jit и настроить потоки.
✔ Сильное ускорение – особенно на больших массивах данных.
✔ Гибкость – можно комбинировать CPU и GPU-вычисления.

В основе алгоритма по-прежнему будет лежать поиск торговых точек, но теперь он будет выполняться параллельно для всех 750 комбинаций сразу.

Однако @cuda.jit накладывает больше ограничений, чем @njit. Например, он не поддерживает возврат значений из функций, что потребует некоторых изменений в коде. Вместо возврата результатов мы будем сохранять и изменять значения непосредственно в массивах, а также использовать указатели для управления данными.

Этот подход позволит полностью задействовать мощность GPU, выполняя вычисления одновременно для множества параметров, что приведет к значительному ускорению процесса.

После ряда доработок код ядра CUDA принял следующий вид:

from numba import cuda  @cuda.jit(device=True, inline=True) def buy_pos(thread_id, k, current_price, position_size, balance, trading_fee, active_position_count, count,              trade_points_indices, trade_points_actions, trade_points_prices, trade_points_positions, trade_points_amounts):     """     Функция для открытия позиции (покупки).          Аргументы:     thread_id: int - Идентификатор потока.     k: int - Индекс текущей цены.     current_price: float - Текущая цена актива.     position_size: float - Размер позиции для покупки.     balance: array - Баланс трейдера.     trading_fee: float - Комиссия за торговлю.     active_position_count: array - Количество активных позиций.     count: array - Счетчик количества сделок.     trade_points_indices, trade_points_actions, trade_points_prices,     trade_points_positions, trade_points_amounts: arrays - Массивы для хранения данных о сделках.     """     cost = position_size * (1 + trading_fee)     if balance[0] >= cost:         amount = position_size / current_price         balance[0] -= cost                  trade_points_indices[thread_id, count[0]] = k         trade_points_actions[thread_id, count[0]] = 2  # BUY         trade_points_prices[thread_id, count[0]] = current_price         trade_points_positions[thread_id, active_position_count[0]] = current_price         trade_points_amounts[thread_id, active_position_count[0]] = amount                  active_position_count[0] += 1         count[0] += 1          @cuda.jit(device=True, inline=True) def sell_pos(thread_id, k, current_price, position_size, balance, trading_fee, active_position_count, count,               trade_points_indices, trade_points_actions, trade_points_prices, trade_points_positions, trade_points_amounts):     """     Функция для закрытия позиции (продажи).          Аргументы аналогичны buy_pos.     """     sold_amount = trade_points_amounts[thread_id, active_position_count[0] - 1]     revenue = sold_amount * current_price - position_size * trading_fee     balance[0] += revenue          trade_points_indices[thread_id, count[0]] = k     trade_points_actions[thread_id, count[0]] = 1  # SELL     trade_points_prices[thread_id, count[0]] = current_price      active_position_count[0] -= 1     trade_points_positions[thread_id, active_position_count[0]] = 0     trade_points_amounts[thread_id, active_position_count[0]] = 0          count[0] += 1  @cuda.jit def find_trade_points_cuda(prices, grid_spreads, position_sizes, initial_balance, trading_fee, trade_points_indices,                             trade_points_actions, trade_points_prices, trade_points_positions, trade_points_amounts, max_trades):     """     Основное CUDA-ядро для поиска торговых точек.          Аргументы:     prices: array - Массив цен актива.     grid_spreads: array - Массив шагов сетки.     position_sizes: array - Размеры позиций.     initial_balance: float - Начальный баланс трейдера.     trading_fee: float - Комиссия за сделку.     trade_points_*: arrays - Массивы для хранения информации о сделках.     max_trades: int - Максимальное количество сделок для каждого трейдера (для предотвращения переполнения памяти).     """     i, j = cuda.grid(2)  # Двумерная индексация потоков     if i < grid_spreads.shape[0] and j < position_sizes.shape[0]:         thread_id = i * position_sizes.shape[0] + j  # Преобразование двумерного индекса в одномерный         grid_spread = grid_spreads[i]  # Получаем параметры конкретного трейдера         position_size = position_sizes[j]         loc_max_price = -1e10          balance = cuda.local.array(1, dtype=np.float32)         balance[0] = initial_balance          count = cuda.local.array(1, dtype=np.int32)         count[0] = 0          active_position_count = cuda.local.array(1, dtype=np.int32)         active_position_count[0] = 0                  # Покупаем первые две позиции         current_price = prices[0]         for _ in range(2):             buy_pos(thread_id, 0, current_price, position_size, balance, trading_fee, active_position_count, count,                      trade_points_indices, trade_points_actions, trade_points_prices, trade_points_positions, trade_points_amounts)          for k in range(1, prices.shape[0]):             if count[0] >= max_trades:                 break             current_price = prices[k]             active_position_price = trade_points_positions[thread_id, active_position_count[0] - 1]                          if active_position_count[0] == 1 and current_price > loc_max_price:                 loc_max_price = current_price             elif active_position_count[0] == 1 and current_price <= loc_max_price * (1 - 2 * grid_spread):                 buy_pos(thread_id, k, current_price, position_size, balance, trading_fee, active_position_count, count,                          trade_points_indices, trade_points_actions, trade_points_prices, trade_points_positions, trade_points_amounts)                 loc_max_price = -1e10             elif active_position_count[0] > 1 and current_price <= active_position_price * (1 - grid_spread):                 buy_pos(thread_id, k, current_price, position_size, balance, trading_fee, active_position_count, count,                          trade_points_indices, trade_points_actions, trade_points_prices, trade_points_positions, trade_points_amounts)             elif active_position_count[0] > 1 and current_price >= active_position_price * (1 + grid_spread):                   sell_pos(thread_id, k, current_price, position_size, balance, trading_fee, active_position_count, count,                           trade_points_indices, trade_points_actions, trade_points_prices, trade_points_positions, trade_points_amounts)                 loc_max_price = -1e10

Код для запуска симуляции торговых стратегий примет вид:

def run_trading_simulation_cuda(df, grid_spreads, position_sizes):     """     Запускает симуляцию торговли с использованием GPU для поиска торговых точек.      Параметры:     - df (pandas.DataFrame): DataFrame с историческими данными, должен содержать столбец 'price' с ценами.     - grid_spreads (np.ndarray): Массив с возможными значениями спредов для сетки.     - position_sizes (np.ndarray): Массив с возможными размерами позиций.      Возвращает:     - traders (list): Список объектов трейдеров, которые прошли через симуляцию.     """     # Извлечение цен и временных меток     prices = df['price'].values.astype(np.float32)     timestamps = df.index.values     traders = []      n = len(prices)     n_traders = len(grid_spreads) * len(position_sizes)  # Общее количество трейдеров     max_trades = 5000  # Максимальное количество сделок на трейдера      # Инициализация массивов для хранения информации о сделках     trade_points_indices = np.zeros((n_traders, max_trades), dtype=np.int32)     trade_points_actions = np.zeros((n_traders, max_trades), dtype=np.int32)     trade_points_prices = np.zeros((n_traders, max_trades), dtype=np.float32)     trade_points_positions = np.zeros((n_traders, max_trades), dtype=np.float32)     trade_points_amounts = np.zeros((n_traders, max_trades), dtype=np.float32)      # Выделение памяти на GPU     d_prices = cuda.to_device(prices)     d_trade_points_indices = cuda.to_device(trade_points_indices)     d_trade_points_actions = cuda.to_device(trade_points_actions)     d_trade_points_prices = cuda.to_device(trade_points_prices)     d_trade_points_positions = cuda.to_device(trade_points_positions)     d_trade_points_amounts = cuda.to_device(trade_points_amounts)     d_grid_spreads = cuda.to_device(grid_spreads)     d_position_sizes = cuda.to_device(position_sizes)      # Определение размера сетки CUDA     threads_per_block = (8, 8)  # 8x8 потоков в блоке     blocks_per_grid = ((grid_spreads.shape[0] + threads_per_block[0] - 1) // threads_per_block[0],                         (position_sizes.shape[0] + threads_per_block[1] - 1) // threads_per_block[1])      # Запуск ядра CUDA для поиска торговых точек     find_trade_points_cuda[blocks_per_grid, threads_per_block](d_prices, d_grid_spreads, d_position_sizes, 1000.0, 0.001,                                                                d_trade_points_indices, d_trade_points_actions,                                                                 d_trade_points_prices, d_trade_points_positions,                                                                 d_trade_points_amounts, max_trades)      # Синхронизация устройства (GPU)     cuda.synchronize()      # Копирование результатов обратно в память CPU     trade_points_indices = d_trade_points_indices.copy_to_host()     trade_points_actions = d_trade_points_actions.copy_to_host()     trade_points_prices = d_trade_points_prices.copy_to_host()      # Обработка результатов и создание трейдеров     for i, grid_spread in enumerate(grid_spreads):         for j, position_size in enumerate(position_sizes):             trader_id = i * position_sizes.shape[0] + j             indices, actions, trade_prices = trade_points_indices[trader_id], trade_points_actions[trader_id], trade_points_prices[trader_id]                          # Инициализация трейдера с начальной ценой и временной меткой             trader = FastGridTrading(timestamps[0], prices[0])             trader.update_strategy(grid_spread, position_size)              # Обработка сделок трейдера             for idx, action, price in zip(indices, actions, trade_prices):                 if 0 <= idx < len(timestamps):  # Проверка, что индекс в пределах допустимого диапазона                     if action == 2:                         trader.process_trade(timestamps[idx], 'BUY', price)  # Покупка                     elif action == 1:                         trader.process_trade(timestamps[idx], 'SELL', price)  # Продажа                     elif action == 0:                         break  # Завершаем обработку сделок              traders.append(trader)  # Добавляем трейдера в список      return traders 

Рассмотрим основные изменения в коде для новой версии функции для симуляции торговли:

  1. Использование ядра Cuda для поиска торговых точек. Для этого были созданы массивы данных, такие как d_prices, d_trade_points_indices, d_trade_points_actions и другие, которые передаются на устройство GPU с помощью функции cuda.to_device.

  2. Определение сетки CUDA: Размер блоков и сетки CUDA был настроен с использованием переменных threads_per_block и blocks_per_grid. Каждый блок в нашем случае содержит 8×8 потоков (всего 64 потока на блок), также, в свою очередь, мы используем двумерную сетку блоков 4х4 для удобной индексации потоков внутри CUDA-ядра.

  3. Запуск CUDA-ядра для вычисления торговых точек: Используется ядро CUDA find_trade_points_cuda, которое запускается для всех трейдеров одновременно. Это ядро отвечает за поиск торговых точек с учетом заданных параметров: цен, спредов и размеров позиций. После выполнения ядра, результаты (индексы торговых точек, действия и цены) копируются обратно на CPU для дальнейшей обработки.

  4. Обработка результатов на CPU: После выполнения вычислений на GPU, данные (торговые индексы, действия и цены) копируются в память CPU с помощью метода copy_to_host. Далее, для каждого трейдера, на основе найденных торговых точек, создаются объекты трейдеров и обрабатываются соответствующие сделки (покупки и продажи).

Отдельно отмечу, что после запуска CUDA-ядра (find_trade_points_cuda[blocks_per_grid, threads_per_block]) необходимо выполнить вызов cuda.synchronize() перед копированием данных обратно на хост. Это нужно для предотвращения состояния rece condition и синхронизации всех потоков, поскольку после старта ядра управление немедленно возвращается, и без синхронизации возможно, что данные на GPU ещё не будут готовы для переноса на хост.

Запустим симуляцию для одного трейдера и замерим среднее время вычислений

%timeit -r 3 -n 1 traders_cuda = run_trading_simulation_cuda(df_aggtrades, grid_spreads[:1], position_sizes[:1])  2min 56s ± 559 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)

При выполнении симуляции с использованием вычислений на GPU для одного трейдера время работы оказывается немного дольше, чем при аналогичных вычислениях на CPU. Это неудивительно, ведь есть несколько факторов, которые влияют на производительность при работе с GPU.

Почему вычисления на GPU для одного трейдера занимают больше времени?

  1. Инициализация GPU: При запуске вычислений на GPU есть значительные затраты времени на инициализацию устройства и передачу данных с CPU на GPU. Особенно это заметно при выполнении симуляций с малым количеством трейдеров, так как запуск GPU-ядер и передача данных может занять больше времени по сравнению с обычными вычислениями на CPU, где данные уже находятся в памяти и не требуется время на их перемещение.

  2. Низкая нагрузка на параллелизм: Когда обрабатывается только один трейдер, GPU не использует свой потенциал параллельных вычислений на полную мощность. Архитектура GPU предназначена для массовых параллельных вычислений, и с одним трейдером вычисления не могут эффективно использовать несколько тысяч потоков. В таких случаях вычисления на CPU могут быть более быстрыми, так как они требуют меньшего времени на переключение контекста и загрузку данных.

С учетом этих факторов вычисления на GPU для одного трейдера могут быть менее эффективными, чем на CPU. Однако с увеличением количества трейдеров, GPU начинает демонстрировать свою истинную мощность.

Измерив среднее время вычисления всех 750 комбинаций, мы в этом убедимся:

%timeit -r 3 -n 1 traders_cuda = run_trading_simulation_cuda(df_aggtrades, grid_spreads, position_sizes)  3min 13s ± 42.4 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)

Расчет всех 750 комбинаций стратегий, каждая из которых обрабатывает датафрейм размером из 551+ млн. строк на GPU занимает около 3 минут, тогда, как на CPU с использованием JIT-компиляция те же вычисления заняли бы у нас 6.9 часов, а на чистом Python — около 6 дней!

Неплохо, не правда ли?

Скрытый текст

Напоследок проанализируем результаты нашей стратегии и выявим оптимальные значения параметров grid_spreads и position_sizes.

Посмотрим на результаты 5 самых лучших трейдеров:

def get_best_traders(traders):     # Сортировка трейдеров по общему балансу на последней торговой точке (Total_balance)     best_traders = list(sorted(traders, key=lambda trader: trader.trading_history[-1]['Total_balance'], reverse=True))          # Вывод 5 лучших трейдеров     print('5 лучших трейдеров:')     for trader in best_traders[:5]:         print(F'Balance {trader.trading_history[-1]['Total_balance']}, Coin_amount {trader.coin_amount}, grid_spread = {trader.grid_spread}, position_size {trader.position_size}')              # Возвращаем отсортированный список трейдеров     return best_traders 
best_traders = get_best_traders(traders_aggtrades_cuda)  5 лучших трейдеров: Balance 1885.984680493356, Coin_amount 0.015925017303672642, grid_spread = 0.02, position_size 300 Balance 1872.3357090157292, Coin_amount 0.01835634717758678, grid_spread = 0.02, position_size 290 Balance 1842.2551673255375, Coin_amount 0.01772336968870448, grid_spread = 0.02, position_size 280 Balance 1841.3185065872528, Coin_amount 0.01709039219982217, grid_spread = 0.02, position_size 270 Balance 1828.858028097698, Coin_amount 0.01645741471093987, grid_spread = 0.02, position_size 260

Наконец, построим график результатов торговли для лучшего трейдера:

import matplotlib.pyplot as plt  def plot_backtest_results_(df, trader):     fig, ax = plt.subplots(3, 1, figsize=(12, 16), sharex=True)      # 📈 1️⃣ График цены монеты с покупками и продажами     ax[0].plot(df.index, df['close'], label="Цена", color='black', linewidth=1)      # Извлекаем сделки из истории     buy_trades = [trade for trade in trader.trading_history if trade['Action'] == "BUY"]     sell_trades = [trade for trade in trader.trading_history if trade['Action'] ==  "SELL"]      # Данные для точек покупок и продаж     buy_times = [trade["Time"] for trade in buy_trades]     buy_prices = [trade["Price"] for trade in buy_trades]      sell_times = [trade["Time"] for trade in sell_trades]     sell_prices = [trade["Price"] for trade in sell_trades]      # 📍 Добавляем точки на график цены     ax[0].scatter(buy_times, buy_prices, color='green', label="Покупки", marker='^', s=100)     ax[0].scatter(sell_times, sell_prices, color='red', label="Продажи", marker='v', s=100)      ax[0].set_ylabel("Цена (USDT)")     ax[0].set_title("График цены монеты и сделок")     ax[0].legend()     ax[0].grid()      # 💰 2️⃣ График роста депозита (usdt)     balance_history = [trade["Balance USDT"] for trade in trader.trading_history]     timestamps = [trade["Time"] for trade in trader.trading_history]      ax[1].plot(timestamps, balance_history, label="Баланс (USDT)", color='blue', linewidth=1.5)     ax[1].set_ylabel("Баланс (USDT)")     ax[1].set_title("Рост свободного депозита")     ax[1].legend()     ax[1].grid()      # 💰  График роста общего депозита      total_balance_history = [trade['Total_balance'] for trade in trader.trading_history]     timestamps = [trade["Time"] for trade in trader.trading_history]      ax[2].plot(timestamps, total_balance_history, label="Баланс (USDT)", color='blue', linewidth=1.5)     ax[2].set_ylabel("Баланс (USDT)")     ax[2].set_title("Рост общего депозита")     ax[2].legend()     ax[2].grid()      plt.xlabel("Время")     plt.xticks(rotation=45)     plt.tight_layout()     plt.show()  plot_backtest_results_(df_klines, best_traders[0])

На графике видно, что лучшим оказался трейдер с гиперпараметрами стратегии grid_spread = 0.02 и position_size = 300. Такая стратегия обеспечила прирост общего депозита на 80%.

Отдельно стоит отметить, что оптимальные гиперпараметры grid_spreads и position_sizes, а также связанные с ними доходность и риски будут отличаться в зависимости от направления тренда. В этой статье этот подход использован просто как наглядный пример того, как можно на практике эффективно ускорить бэктестинг стратегий, перенаправив вычисления на GPU. Этот метод особенно полезен, когда требуется обработать большое количество однотипных операций, что значительно сокращает время расчётов.


ссылка на оригинал статьи https://habr.com/ru/articles/893748/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *