
Здравствуйте дорогие хабровчане, в этом посте я покажу, как написать свой биржевой индекс наподобие S&P 500 или Nasdaq.
О том, как мне это пришло в голову можно прочитать в моей предыдущей статье: Как я решил стать трейдером и проигрался, а потом отыгрался, потому, что я программист. Мой опыт. Здесь будет рассмотрена только техническая сторона.
В силу того, что прошлая статья получила большой отклик у хабровчян, я решил сделать техническое описание кода приложения.
Не работаю в Тинькофф и мне не заплатили за рекламу. Конечно, была попытка добавить эту статью в корпоративный блог Тинькова хоть за символическую плату, но не тут то было. Меня просто месяц динамили, а потом сказали, дескать блог только исключительно для сотрудников компании (должно быть это месть за критику службы поддержки в прошлой статье).

В предыдущей статье было описано, как мне удалось рассчитать и использовать индекс чистой стоимости фонда и как я к этому пришёл. Этот индекс был написан на старом api, на тот момент единственном. Из-за этого возникло множество проблем, главная из которых большая задержка в обновлении данных.
Чтобы уменьшить эту задержку (которая в итоге составила 5,5 секунд!), мне пришлось использовать два токена доступа, которые чередовались для каждого запроса. Также я вынужден был выбрать строго ограниченное количество компаний, для которых проводился расчёт. Бутылочным горлышком являлось отсутствие возможности получать данные по нескольким компаниям в одном запросе. Например, чтобы получить текущие цены акций десяти компаний нужно было десять запросов. Если не укладывался в лимит, а количество запросов было ограничено в минуту, api возвращал ошибки на последующие обращения.
Однако, в начале 2022 года тинькофф выкатил новый api, в котором эти проблемы были решены. Теперь можно в одном запросе получать все текущие цены на акции.
Сам микросервис написан на Python 3 и упакован в Docker-контейнер, клиент — на JavaScript, график TradingView. Микросервис и клиент общаются при помощи api реализованном на FastApi.
Репозиторий с исходным кодом — на GitHub.

Шаг 1. Получаю данные из Тинькофф api
import pandas as pd import numpy as np from datetime import datetime, timedelta import logging import sys from tinkoff.invest import Client, CandleInterval import warnings warnings.filterwarnings("ignore", 'This pattern has match groups') class CustomIndex: def __init__(self, tickets, historical_days=2, token='token.txt'): self.logger = self.__create_logger() self.historical_days = historical_days self.reset_last_candle() try: with open(token, 'r') as file: self.__token = file.read().rstrip() except Exception as e: self.logger.exception(e) raise Exception('--> Ошибка в файле token.txt: '+ str(e)) try: with Client(self.__token) as client: shares = client.instruments.shares() except Exception as e: self.logger.exception(e) raise Exception('--> tinkoff api - Ошибка загрузки данных обо всех акциях.') shares = pd.DataFrame(shares.instruments) self.df = shares[shares['ticker'].isin(tickets)] def reset_last_candle(self, open_price=0.0, time=pd.Timestamp(0, tz='Europe/Moscow')): self.last_candle = pd.Series({'open': open_price, 'high': open_price, 'low': open_price, 'close': open_price, 'time': time}) def round_to_minutes(self, t, interval=5): delta = pd.Timedelta(minutes=t.minute % interval, seconds=t.second, microseconds=t.microsecond) t -= delta return t def __units_nano_convert(self, d): price = '{}.{}'.format(d['units'], d['nano']) price = float(price) return price def __create_logger(self): logger = logging.getLogger() logger.setLevel(logging.ERROR) formatter = logging.Formatter('--> %(asctime)s - %(name)s - %(levelname)s - %(message)s') sh = logging.StreamHandler(sys.stdout) sh.setFormatter(formatter) logger.addHandler(sh) # send logs in docker logs fh = logging.FileHandler('/proc/1/fd/1') fh.setFormatter(formatter) logger.addHandler(fh) return logger def get_tinkoff_candles(self, figi, interval): if interval == 1: interval = CandleInterval.CANDLE_INTERVAL_1_MIN elif interval == 5: interval = CandleInterval.CANDLE_INTERVAL_5_MIN elif interval == 15: interval = CandleInterval.CANDLE_INTERVAL_15_MIN else: interval = -1 curr_time = datetime.now() data = [] for day in range(self.historical_days): try: with Client(self.__token) as client: data += client.market_data.get_candles( figi=figi, from_=curr_time - timedelta(days=day+1), to=curr_time - timedelta(days=day), interval=interval ).candles except Exception as e: self.logger.exception(e) raise Exception('--> tinkoff api - history - Ошибка загрузки исторических данных.') candles = pd.DataFrame(data) for col in ['open', 'high', 'low', 'close']: candles[col] = candles[col].apply(self.__units_nano_convert) candles = candles[['time', 'open', 'high', 'low', 'close']] candles['time'] = candles['time'].dt.tz_convert('Europe/Moscow') candles.set_index('time', inplace=True) candles = candles.drop_duplicates() return candles def get_tinkoff_last_prices(self): try: with Client(self.__token) as client: last_prices = client.market_data.get_last_prices(figi=self.df['figi'].to_list()) except Exception as e: self.logger.exception(e) raise Exception('--> tinkoff api - last price - Ошибка загрузки последней цены.') last_prices = pd.DataFrame(last_prices.last_prices) last_prices = last_prices['price'].apply(self.__units_nano_convert) return last_prices
У каждой компании, торгующейся на бирже, кроме названия, есть свой тикет. Однако, для того, чтобы получить данные по компании необходимо знать её figi код. Чтобы его получить необходимо сопоставить тикет и figi код. Это происходит в конструкторе класса CustomIndex. В нём скачиваются figi для всех акций и сопоставляются с нужными мне тикетами. Здесь нет ничего сложного.
Метод get_tinkoff_candles получает исторические данные по figi коду. Исходя из ограничений api на интервалах 1, 5 и 15 минут можно получить только один день. Так же было и в старом api. То есть, чем больше исторических данных мне нужно, тем больше нужно запросов (ограничение запросов в минуту никуда не делось). Любопытно было бы узнать, из каких соображений данные ограничили одним днём.
Ну и главное отличие от старого api я с удовольствием использовал в методе get_tinkoff_last_prices. В нём обновляются все последние цены для всех акций за один запрос.
Шаг 2. Микросервис (FastApi)
import pandas as pd import numpy as np from collections import defaultdict from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from custom_index import CustomIndex # Топ 5 самых дорогих акций из S&P 500 tickets = ['NVR', 'AMZN', 'GOOG', 'GOOGL', 'BKNG'] # Количество исторических дней для отображения historical_days = 4 # Расчёт индекса def compute_index(prices): mean_price = np.mean(prices) return mean_price api = FastAPI() api.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) ci = CustomIndex(tickets, historical_days=historical_days, token='token.txt') # Исторические данные @api.get('/api/historical_candles/{interval}') def historical_candles(interval: int): global ci # Средняя цена d = defaultdict(pd.DataFrame) # Собираем данные, все open от всех тикетов в один # датафрейм, high в другой и т.д. def concat_columns(d, one_ticket_candles): for col in ['open', 'high', 'low', 'close']: d[col] = pd.concat([d[col], one_ticket_candles[col]], axis=1) return d try: # Исторические данные для каждого тикета # скачиваем и обрабатываем отдельно for _, row in ci.df.iterrows(): one_ticket_candles = ci.get_tinkoff_candles(row['figi'], interval) d = concat_columns(d, one_ticket_candles) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) for col in ['open', 'high', 'low', 'close']: d[col] = d[col].sort_index(ascending=True) d[col] = d[col].fillna(method='ffill') d[col] = d[col].dropna() d[col] = d[col].apply(compute_index, axis=1) candles = pd.DataFrame(d) candles.index.name = 'time' candles.reset_index(inplace=True) ci.reset_last_candle() return candles.to_json(orient="records") # Текущая цена @api.get('/api/currient_candle/{interval}') def currient_candle(interval: int): global ci curr_time = ci.round_to_minutes(pd.Timestamp.now(tz='Europe/Moscow'), interval) try: last_prices = ci.get_tinkoff_last_prices() except Exception as e: raise HTTPException(status_code=500, detail=str(e)) last_price = compute_index(last_prices) # формирование свечей по последней цене if curr_time > ci.last_candle['time']: # Новая свеча ci.reset_last_candle(open_price=last_price, time=curr_time) else: ci.last_candle['close'] = last_price if ci.last_candle['high'] < last_price: ci.last_candle['high'] = last_price elif ci.last_candle['low'] > last_price: ci.last_candle['low'] = last_price return ci.last_candle.to_json()
Через этот микросервис поставляются данные клиенту. Основные параметры для настройки находятся в начале файла.
- Лист
tickets— в нём записаны тикеты компании на основе которых будет рассчитываться индекс. Для примера я взял 5 самых дорогих акций из списка S&P 500. - Переменная
historical_days— количество исторических дней для загрузки. Если дней будет слишком много, то можно получить ошибки, см. лимитную политику. - Функция
compute_index— здесь происходит непосредственный расчёт индекса, в моём случае я просто беру среднюю цену всех акций.
В функции historical_candles формируются исторические данные. Для их расчёта группируются все цены каждого тикета, т.е. цены open, high, low и close рассчитываются отдельно друг от друга. На выходе получаются исторические данные рассчитанного индекса.
Функция currient_candle формирует текущую свечку графика и рассчитывает индекс исходя из текущих цен на акции.
Шаг 3. Клиент (TradingView)
<!DOCTYPE html> <html> <head> <meta charset="utf-8"> <title>Custom fund index</title> </head> <body> <h2>Custom fund index chart</h2> <div> <label for="one"> <input type="radio" id="one" name="interval" value="1" /> 1 minute </label> <label for="five"> <input type="radio" id="five" name="interval" value="5" checked /> 5 minutes </label> <label for="fifteen"> <input type="radio" id="fifteen" name="interval" value="15" /> 15 minutes </label> </div> <div id="custom_chart"></div> </body> <!-- TradingView Chart --> <script src="https://unpkg.com/lightweight-charts/dist/lightweight-charts.standalone.production.js"></script> <script src="index.js"></script> </html>
Файл index.html совсем простой, содержит в себе переключатель ‘radio’ для изменения интервала графика 1, 5 и 15 минут, также div контейнер custom_chart для самого графика. Здесь же подключены скрипты TradingView.
const log = console.log; let interval = document.querySelector('input[name="interval"]:checked').value; let currInterval = interval const chartProperties = { width: 1450, height: 600, timeScale: { timeVisible: true, secondsVisible: false, } }; const domElement = document.getElementById('custom_chart'); const chart = LightweightCharts.createChart(domElement, chartProperties); const candleSeries = chart.addCandlestickSeries() // History function getHistory(interval) { fetch(`http://127.0.0.1:8000/api/historical_candles/` + interval) .then(res => res.json()) .then(json_str => JSON.parse(json_str)) .then(data => { // log(data); for (let i = 0; i < data.length; ++i) { data[i].time = data[i].time / 1000 + 10800; // localize to Moscow time 60*60*3 = 10800 }; candleSeries.setData(data); }) .catch(err => log(err)) } getHistory(interval); // Dynamic Chart setInterval(function () { currInterval = document.querySelector('input[name="interval"]:checked').value; if (currInterval != interval) { getHistory(currInterval); interval = currInterval; } fetch(`http://127.0.0.1:8000/api/currient_candle/` + interval) .then(res => res.json()) .then(json_str => JSON.parse(json_str)) .then(data => { log(data); data.time = data.time / 1000 + 10800 // localize to Moscow time 60*60*3 = 10800 candleSeries.update(data); }) .catch(err => log(err)) }, 1000); // <-- Увеличивай интервал здесь!
В файле index.js функция getHistory получает исторические данные из микросервиса и отображает график. Обновление текущей цены происходит в планировщике вызова setInterval, здесь же обновляются исторические данные, если поменять интервал свечей графика.
Шаг 4. Упаковываем в Docker контейнер
FROM python:3.8 RUN python -m pip install pandas numpy tinkoff-investments fastapi gunicorn uvicorn uvloop httptools WORKDIR /app ADD tinkoff-microservice.py tinkoff-microservice.py ADD custom_index.py custom_index.py ADD token.txt token.txt
Упакованный в Docker-контейнер микросервис очень удобно использовать и портировать.
version: '3.7' services: microservice: build: context: ./microservice image: tinkoff-microservice container_name: tinkoff-microservice restart: unless-stopped ports: - "8000:8000" command: gunicorn -b 0.0.0.0:8000 -k uvicorn.workers.UvicornWorker tinkoff-microservice:api nginx: image: nginx container_name: nginx-html restart: unless-stopped volumes: - ./html:/usr/share/nginx/html:ro depends_on: - microservice ports: - "8080:80"
Микросервис и клиент общаются между собой через порт 8000, клиент доступен на localhost:8080

Примечание: для работы приложения нужен docker-compose и браузер. Чтобы запустить микросервис необходимо добавить свой токен от тиньков api в файл token.txt, который находится в папке microservice. Выполнить в корневой папке проекта docker-compose up --build и открыть в браузере http://localhost:8080/.
Заключение
Как и было отмечено в предыдущей статье, подобный индекс вполне себе рабочий инструмент. Однако есть несколько недочетов, которые не удалось решить (возможное решение это stream-соединения в тинькофф api):
- Не совсем точное отображение истории.
- Ошибки из-за превышения лимита запросов к тинькофф api.
Спасибо за внимание.
ссылка на оригинал статьи https://habr.com/ru/post/656547/
Добавить комментарий