TitanRt 0.3 — Пишем свой CoinMarketCup: часть 1

от автора

Что ж, да, вы не ослышались — прямо сейчас мы с вами подготовим свой движок для поддержания такого бэкенда, как у CoinMarketCup (кмк). И писать будем на моем любимом Rust. Использовать под капотом будем мою либу TitanRt, которую я лениво и скомкано презентовал в предыдущем посте. Постараюсь быть полезным и последовательным.

Осторожно: много кода (с комментариями)…


Зависимости

[package] name = "token_scanner" version = "0.1.0" edition = "2024"  [dependencies] titanrt = { version = "0.3.0", features = ["reqwest_conn"] } # serde = { version = "1.0.219", features = ["derive"] } anyhow = "1.0.99" tracing-subscriber = "0.3.20" tracing = "0.1.41" config = "0.15.15"

Заготовка main.rs

fn main() -> anyhow::Result<()> {     tracing_subscriber::fmt().with_max_level(Level::INFO).init(); // подключаем банальный трейсинг      let cfg = RuntimeConfig {         init_model_on_start: true, // запускаем модель сразу до начала лупа (не посылаем команду Start)         core_id: Some(1),          // пиним поток рантайма к ядру         max_inputs_pending: Some(128), // до скольких элементов можно забить очередь, если не успеваем обработать (в рантайм можно отправлять входные события)         max_inputs_drain: None, // сколько максимум дрейним на итерации входных событий (если None, то max_inputs_pending)         stop_model_timeout: Some(5), // максимальное время ожидания модели для остановки внутри лупа     };      // вычисляем путь к файлу с конфигом     let model_cfg_path = if let Some(path) = env::args().nth(1) {         path     } else {         "./model_cfg".to_string()     };      // загружаем конфиг     tracing::info!("Loading model config from: {}", model_cfg_path);     let model_cfg = load_model_cfg(model_cfg_path)?;      // модель будет отсылать события, которые мы будем прослушивать и сами предпринимать действия     //  создаем канал взаимодействия между моделью и текущим потоком     let (output_tx, mut output_rx) = MpmcChannel::unbounded::<Output<OuputEvent>>();      // пока нам не нужен никакой контекст, все берем из конфига     // юзаем удобный маркер из либы     let model_ctx = NullModelCtx;      // запускаем отдельный поток с рантаймом, где будет крутиться обслуживающий busy-loop и дергаться модель     let rt = Runtime::<TokenScannerModel>::spawn(cfg, model_ctx, model_cfg, output_tx)?;      let mut outputs_count = 0;      // принимаем от модели события и просто принтим их (здесь может быть сложная логика, например, для взаимодействия с бэком)     loop {         if outputs_count > 1000 {             break Ok(());         }          while let Ok(output) = output_rx.try_recv() {             tracing::info!("{:?}", output);             outputs_count += 1;         }          // просто держим поток в течение 100 мс         std::thread::sleep(std::time::Duration::from_millis(100));     } }  // вспомогательная функция для загрузки конфига fn load_model_cfg(path: String) -> anyhow::Result<TokenScannerConfig> {     let cfg = Config::builder()         .add_source(config::File::from(PathBuf::from(&path)))         .build()         .with_context(|| format!("failed to read model config from {path}"))?;      let model: TokenScannerConfig = cfg         .try_deserialize()         .with_context(|| format!("failed to deserialize model config from {path}"))?;      Ok(model) } 

Предварительно реализуем саму модель

pub struct TokenScannerModel {}  impl BaseModel for TokenScannerModel {     type Config = TokenScannerConfig; // тип конфига     type OutputTx = MpmcSender<Output<OuputEvent>>; // указываем имплементацию трансмиссии     type OutputEvent = OuputEvent; // событие трансмиссии     type Event = NullEvent; // не шлем события в модель     type Ctx = NullModelCtx; // не используем контекст      fn initialize(         ctx: Self::Ctx,         config: Self::Config,         reserved_core_id: Option<usize>,         output_tx: Self::OutputTx,         cancel_token: CancelToken,     ) -> anyhow::Result<Self> {         todo!()     }      fn execute(&mut self) -> ExecutionResult {         todo!()     }      // не шлем в модель извне никаких событий (могли бы через объект рантайма)     fn on_event(&mut self, event: Self::Event, meta: Option<InputMeta>) {         unimplemented!()     }      // останавливаемся сразу     fn stop(&mut self, kind: StopKind) -> StopState {         // StopState::InProgress         StopState::Done     }      // не релоадим конфиги на лету     fn hot_reload(&mut self, config: &Self::Config) -> anyhow::Result<()> {         unimplemented!()     } } 

Что делаем во время инициализации модели

Нам нужен стрим, который будет в отдельном потоке принимать команды от модели и асинхронно отправлять запросы на биржи, чтобы фетчить данные.

В версии 0.3.0 я добавил такой универсальный http-коннектор с подобным стримом на базе reqwest. Работает в одном треде в цикле в такой схеме:

  1. В начале итерации проверяет очередь команд, если не пустая, то создает фоновые токио таски через local_spawn, внутри которой идет проверка встроенного рейт лимит менеджера, затем уже отправка запроса на сервер. После ожидания ответа отправляем через mcmp канал ответ в наш основной цикл.

  2. Идет проверка на готовые ответы. Если ответы имеются, то вызываем модельный хук и передаем все нужное, в том числе сырой ответ.

  3. Даем токио рантайму немного продышаться и выполнить таски.

  4. Конец итерации

Просто возьмем из titanrt уже готовый коннектор и заспавним стрим прямо в модели во время инита:

pub struct TokenScannerModel {     reqwest_stream: Stream<RingSender<ReqwestAction>, RingReceiver<CryptoEvent>, NullState>, // держим стрим для отправки http запросов и вызова хука     output_tx: MpmcSender<Output<OuputEvent>>, // держим отправителя для отправки внутренний событий в вызывающий код }  impl BaseModel for TokenScannerModel { ...  fn initialize(         _ctx: Self::Ctx,         config: Self::Config,         reserved_core_id: Option<usize>,         output_tx: Self::OutputTx,         cancel_token: CancelToken,     ) -> anyhow::Result<Self> {         // инитим коннектор (могли бы и закрепить его в структуре, чтобы можно было всегда заново создать стрим или еще один)         let mut conn = ReqwestConnector::init(             config.reqwest_conn,      // вынесем конфиг коннектора в модельный конфиг             cancel_token.new_child(), // модельный токен будет родителем, если отменим стрим, то модель останется             reserved_core_id.map(|c| vec![c]), // укажем зарезервированное ядро         )?;          // для ясности:         // let mut desc = ReqwestStreamDescriptor {         //     max_hook_calls_at_once: 10, // максимальное количество вызовов хуков в одной итерации (по сути, сколько дреним ответов за раз)         //     wait_async_tasks_us: 0, // сколько даем поспать токио рантайма - основное место для джиттера         //     max_pending_actions: None, // максимальное количество команд, которые стрим держит необработанными в очереди         //     max_pending_events: None, // максимальное количество событий, которые модель держит необработанными в очереди         //     core_pick_policy: Some(CorePickPolicy::Specific(2)), // привяжем поток стрима на отдельное ядро процессора         //     rate_limits: config.reqwest_rate_limits.clone(),         // };          // короче:         let mut desc = ReqwestStreamDescriptor::low_latency();          for rl_cfg in config.reqwest_rate_limits.iter() {             desc.add_rate_limit(rl_cfg.clone());         }          // наконец, спавним стрим в отдельном потоке через коннектор         // передаем объем дескриптора и кастомный модельный hook, реализуемый в другом месте         let reqwest_stream = conn.spawn_stream(desc, crypto_hook)?;          // ждем пока стрим не запуститься         // например, могли накосячить с конфигом и стрим не запустился         let deadline = Instant::now() + Duration::from_secs(10);          while !reqwest_stream.is_healthy() {             if Instant::now() >= deadline {                 return Err(anyhow::anyhow!(                     "stream didn't become healthy within 10 seconds",                 ));             }             sleep(Duration::from_secs(1));         }          Ok(Self {             output_tx,             reqwest_stream,         })     } }

Конфигурацию мы оставим на потом, пока просто примем во внимание, что она есть.

Как сейчас выглядит crypto_hook

#[derive(Debug, Clone)] pub struct CryptoEvent {}  pub fn crypto_hook(     args: HookArgs<         ReqwestEvent, // сырое событие, которое детерминируется на уровне реализации стрима         RingSender<CryptoEvent>, // будем отправлять из хука спарсенные события         NullReducer, // можете реализовать на случай, если нужен стейт внутри стрима         NullState, // это если нужен стейт, который будете лоадить в модели         ReqwestStreamDescriptor, // из-за этого имеем доступ к нашему дескриптору внутри хука     >, ) {     if args.raw.is_success() {     } else {     } } 

Скоро мы это исправим, но прежде мы должны понять, что мы вообще делаем.

Бизнес логика — execute()

А что мы вообще делаем, собственно? Титан райнтайм запустит модель и будет шарашить model.execute() миллионы раз в секунду. Чем толще и массивнее execute, тем меньше итераций в лупе рантайме в секунду. И это основное место, где реализуется бизнес логика модели (или торговая, если хотите).

Очевидно, на экзекьюте нам нужно реализовать логику отправки команд в наш стрим, а также дренить (высушивать) наш ресивер, через который мы отправляем спарсенные ответы в хуке.

Давайте подготовим минимальный плацдарм просто для наглядности:

impl BaseModel for TokenScannerModel { ...  fn execute(&mut self) -> ExecutionResult {         let events = self.reqwest_stream.drain_max(); // полностью осушаем ресивер          for event in events {             tracing::debug!("got event: {:?}", event);         }          if self.reqwest_stream.is_healthy() {             // экшены детерминируются реализацией стрима, поэтому юзаем то, что нам дают (или сами реализуем стрим)                          // здесь имеется в виду, что мы билдим GET запрос, передаем рл контекст, чтобы можно было отыскать конфигурацию рейт лимитера             let action = ReqwestAction::get(                 Url::parse(                     "https://api.bybit.com/v5/market/orderbook?category=spot&symbol=BTCUSDT",                 )                 .unwrap(),             )             .rl_ctx(RateLimitContext::new("bybit"))             .build();                          // и просто отправляем экшен в стрим, чтобы в ответ на него получить событие в нашем хуке             match self.reqwest_stream.try_send(action) {                 Ok(()) => {}                 Err(e) => {                     tracing::error!("error sending reqwest action: {}", e);                 }             }         }          ExecutionResult::Relax // чтобы не жечь ядро и чутка тормозить busy-loop     } }

Естественно, это не просто минимальный плацдарм, а скорее пустышка в демонстрационных целях, как можно за минуту реализовать получение снэпшотов ордербука с bybit внутри execute. Если мы это запустим, то даже событий никаких не будет, так как у нас пустой хук. Тем не менее, коллеги, это точка роста и начало программистской мысли.

В следующей части

  • Реализуем OutputEvent — событие, которое шлет модель время от времени в вызывающий код, например, для бэкенда. Это может быть снимок состояния токенов, которые мы сканируем. Или дельты по конкретному токену для сохранения в базу.

  • Реализуем CryptoEvent — событие, в которое парсим сырой ответ из хука. Это будет событие с req_id, а так же внутренним enum, чтобы не ограничиваться только запросами снэпшотов стакана, а использовать разные данные с бирж, в том числе Ticker Info. И сделаем его унифицированным, чтобы не ограничиваться только биржей байбит или рынком спот.

  • Реализуем хук и парсинг сырых событий с разных источников. Возьмем 2: bybit + binance.

  • Реализуем тайминги внутри execute и дополнительные структуры для отправки экшенов. Так же поймем, что будем делать с объектами CryptoEvent и в какой редьюсер их собирать, чтобы затем высылать output_tx.

  • Расширим конфигурацию модели и обсудим ее подробнее.

Код: https://github.com/rgnatovsky/token_scanner


ссылка на оригинал статьи https://habr.com/ru/articles/944308/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *