TitanRt — реактивный рантайм для реального времени (и не только HFT)

от автора

Сегодня решил собрать воедино то, что я знаю о разработке высокочастотных систем в связке с Rust. Из кусков кода, по сусекам, что называется, по репозиториям наскреб и склеил с помощью готопоты достойную к вашему вниманию либу. Библа позволит сохранить время всем, кто стремиться одновременно и к скорости, и гибкости. Планирую сам активно юзать, чтобы перейти со старой асинхронной торговой инфры в истинные треды с закосом под ультра. И развивать либу под брендом TitanRt. А если сообществу зайдет, так вообще мотивации прибавиться.

Итак, TitanRt — typed reactive runtime для построения реактивных, низколатентных систем на Rust.

Если упростить: это минималистичная основа для приложений, которые живут в цикле событий, где важны:

  • миллисекунды (а иногда и наносекунды)

  • предсказуемая обратная нагрузка (back-pressure)

  • чёткий контроль жизненного цикла модели и её воркеров

  • максимальная гибкость в области более высокоуровневой разработки

  • возможность прибить всё красиво и быстро, а не висеть на zombie-тредах


Зачем ещё один рантайм?

Rust уже имеет Tokio, Actix и прочих асинхронных гигантов. Но они решают другую задачу — высокоуровневый async/await, HTTP-сервисы, очереди.

TitanRt сделан для систем реального времени:

  • торговые движки (HFT/market making)

  • анализ рыночных потоков

  • телеметрия и алертинг

  • системы с жёстким контролем задержки

То есть там, где:

  • нельзя позволить себе лишний аллокатор

  • хочется контролировать ядро CPU, на котором работает поток

  • нужны строго типизированные каналы между моделью и воркерами


Архитектура

Вместо гигантского фреймворка TitanRt — это всего пара простых идей:

  1. Model-first:
    Ваша бизнес-логика = BaseModel.
    Она сама создаёт коннекторы и стримы, сама ими управляет, хотя не детерминирует это поведение. Вы можете управлять извне через Control Plane слой.

  2. Connector / Stream layer:
    Коннектор = фабрика стримов.
    Стрим = воркер-тред с типизированными каналами и вкуснятиной в виду StateCell<T>, где под капотом arc-swap. Со стримом можно общаться разными способами: через ваш типизированный Action, который вы отправляете через ваш типизированный Tx<Action>, а так же через типизированный hook, в который приходит сырое событие стрима, ваш Rx<Event> и StateCell<T> — здесь вы выбираете, что делать с данными, чтобы оркестрировать этим цикле модели.

  3. Runtime:
    Небольшой управляющий поток, который гоняет команды: Start, Stop,Restart,HotReload, Shutdown. Ну и саму модель, на которой вызывается ваш импл execute() или event() — если вы вдруг гоняете события извне рантайма.
    В общем и целом, рантайм не знает про ваши протоколы и бизнес-логику — только управляет жизненным циклом модели.

Не совсем детальная, но какая-никакая схема:

                   ┌──────────────────────────┐                    │         Runtime          │                    │ Start/Stop/Restart/...   │                    └───────────▲─────────────┘                                │                      ┌─────────┴─────────┐                      │       Model        │                      │ owns connectors    │                      └─────────┬─────────┘                                │                    ┌───────────▼───────────┐                    │     Connector(s)      │                    │    spawn Stream(s)    │                    └───────────┬───────────┘                                │                     Actions ───►│   │◄─── Events                                ▼                             Stream 

Как это выглядит в коде

Минимальная модель:

use titanrt::model::{BaseModel, ExecutionResult, StopKind, StopState}; use titanrt::utils::CancelToken; use anyhow::Result;  #[derive(Clone, Debug)] struct MyConfig {     greeting: String, }  #[derive(Clone, Debug)] struct MyEvent(String);  #[derive(Clone)] struct MyOutputTx; // no-op  impl titanrt::io::base::BaseTx for MyOutputTx {     type EventType = String;     fn try_send(&mut self, v: String) -> Result<(), titanrt::error::SendError<String>> {         println!("OUT: {v}");         Ok(())     }     fn send(         &mut self,         v: String,         _: &CancelToken,         _: Option<std::time::Duration>,     ) -> Result<(), titanrt::error::SendError<String>> {         self.try_send(v)     } }  struct MyModel {     cfg: MyConfig,     out: MyOutputTx, }  impl BaseModel for MyModel {     type Config = MyConfig;     type OutputTx = MyOutputTx;     type Event = MyEvent;     type Ctx = ();      fn initialize(         _ctx: (),         config: MyConfig,         _core_id: Option<usize>,         output_tx: MyOutputTx,         _cancel: CancelToken,     ) -> Result<Self> {         Ok(Self { cfg: config, out: output_tx })     }      fn execute(&mut self) -> ExecutionResult {         let _ = self.out.try_send(format!("{}!", self.cfg.greeting));         ExecutionResult::Relax     }      fn on_event(&mut self, event: MyEvent) {         let _ = self.out.try_send(format!("event: {}", event.0));     }      fn stop(&mut self, _kind: StopKind) -> StopState {         StopState::Done     } } 

А потом — просто запускаем:

fn main() -> Result<()> {     let cfg = titanrt::config::RuntimeConfig {         init_model_on_start: true,         core_id: None,         max_inputs_pending: Some(1024),         max_inputs_drain: Some(64),         stop_model_timeout: Some(5),     };      let rt = titanrt::runtime::Runtime::<MyModel>::spawn(         cfg,         NullModelCtx,          MyConfig { greeting: "Hello, TitanRt".into() },         MyOutputTx,     )?;      rt.run_blocking()?;     Ok(()) } 

Чем это отличается от Tokio?

  • Без async/await: здесь честные треды, без скрытого планировщика. Тем не менее, планируется добавить опциональный токио рантайм из коробки.

  • CPU pinning: можно закрепить поток за ядром. Хочется в будущих версиях добавить настройку планировщика и приоритетность.

  • Typed I/O: никакого Any, всё компилируется статически. Строго.

  • Минимум зависимостей: crossbeam, arc-swap,ahash, ringbuf, немного serde.


Когда использовать TitanRt

  • Нужно максимально предсказуемое поведение под нагрузкой.

  • Ваш код живёт в бесконечном цикле: обработка стакана, телеметрии, сигналов.

  • Вы пишете HFT-движок, рынок данных, RT-алертинг или даже какую-нибудь игровую сетевую петлю.

А если нужен HTTP, gRPC, база и веб — берите Tokio/Actix, TitanRt не про это.


Репозиторий и документация


Итог

TitanRt — это скелет для реактивных систем, где важна латентность и контроль.
Вы пишете модель и коннекторы → модель создаёт коннекторы и стримы → стримы гоняют события → рантайм следит за жизненным циклом.

Просто, предсказуемо и типобезопасно.


Плюшка в виде простого yellowstone-grpc стрима:

 pub struct CompositeConnector {     pub(crate) config: CompositeConfig,     pub(crate) cancel_token: CancelToken,     pub(crate) core_stats: Option<Arc<CoreStats>>, }  impl BaseConnector for CompositeConnector {     type Config = CompositeConfig;      fn init(         config: Self::Config,         cancel_token: CancelToken,         reserved_core_ids: Option<Vec<usize>>,     ) -> anyhow::Result<Self> {         let core_stats = if config.with_core_stats {             Some(CoreStats::new(                 config.default_max_cores,                 config.specific_cores.clone(),                 reserved_core_ids.unwrap_or_default(),             )?)         } else {             None         };          Ok(Self {             config,             cancel_token,             core_stats,         })     }      fn name(&self) -> impl AsRef<str> + Display {         "CompositeConnector"     }      fn config(&self) -> &Self::Config {         &self.config     }      fn cancel_token(&self) -> &CancelToken {         &self.cancel_token     }      fn cores_stats(&self) -> Option<Arc<CoreStats>> {         self.core_stats.clone()     } }  #[derive(Clone)] pub enum GeyserAction {    Subscribe(SubscribeRequest),       UnsubscribeAll, }  #[derive(Clone, Debug)] pub struct GeyserGrpcDescriptor {     pub endpoint: String,     pub auth_token: Option<String>,     pub max_pending_actions: Option<usize>,     pub max_pending_events: Option<usize>,     pub core_pick_policy: CorePickPolicy,     pub subscription: Option<SubscribeRequest>, }  impl GeyserGrpcDescriptor {     pub fn new(         endpoint: String,         auth_token: Option<String>,         max_pending_actions: Option<usize>,         max_pending_events: Option<usize>,         core_pick_policy: CorePickPolicy,     ) -> Self {         Self {             endpoint,             auth_token,             max_pending_actions,             max_pending_events,             core_pick_policy,             subscription: None,         }     }      pub fn with_subscription(mut self, sub: SubscribeRequest) -> Self {         self.subscription = Some(sub);         self     } }  impl StreamDescriptor for GeyserGrpcDescriptor {     fn venue(&self) -> impl Venue {         Venues::Solana     }      fn kind(&self) -> impl Kind {         "YellowstoneGrpc"     }      fn max_pending_actions(&self) -> Option<usize> {         self.max_pending_actions     }      fn max_pending_events(&self) -> Option<usize> {         self.max_pending_events     }      fn core_pick_policy(&self) -> Option<CorePickPolicy> {         Some(self.core_pick_policy)     }      fn health_at_start(&self) -> bool {         false     } }  #[derive(Clone, Debug)] pub enum GeyserEvent {     Raw(UpdateOneof), }  impl<E, S> StreamSpawner<GeyserGrpcDescriptor, E, S> for CompositeConector where     S: StateMarker,     E: BaseTx + TxPairExt, { }  impl<E, S> StreamRunner<GeyserGrpcDescriptor, E, S> for CompositeConector where     S: StateMarker,     E: BaseTx, {     type Config = ();     type ActionTx = RingSender<GeyserAction>;     type RawEvent = GeyserEvent;     type Hook = fn(&Self::RawEvent, &mut E, &StateCell<S>);      fn build_config(&mut self, _desc: &GeyserGrpcDescriptor) -> anyhow::Result<Self::Config> {         Ok(())     }      fn run(         mut ctx: RuntimeCtx<GeyserGrpcDescriptor, Self, E, S>,         hook: Self::Hook,     ) -> StreamResult<()> {         // Однопоточный рантайм ТОЛЬКО внутри run (внешний API остаётся синхронным)         let rt = Builder::new_current_thread()             .enable_time()             .enable_io()             .build()             .map_err(|e| StreamError::Unknown(anyhow!(e)))?;          let mut rng = SmallRng::from_os_rng();         let mut backoff_ms: u64 = 50;         let backoff_max_ms: u64 = 5_000;         let backoff_mul: f64 = 1.7;          if !ctx.desc.health_at_start() {             ctx.health.set(true);         }          rt.block_on(async move {             'reconnect: loop {                 if ctx.cancel.is_cancelled() { break Ok(()); }                  if backoff_ms > 50 {                     let j = rng.random_range(0..(backoff_ms / 5 + 1));                     tokio::time::sleep(Duration::from_millis(j)).await;                 }                  let mut client = match GeyserGrpcClient::build_from_shared(ctx.desc.endpoint.clone())                     .map_err(|e| StreamError::Unknown(anyhow!(e)))?                     .x_token(ctx.desc.auth_token.clone())                     .map_err(|e| StreamError::Unknown(anyhow!(e)))?                     .tls_config(ClientTlsConfig::new().with_native_roots())                     .map_err(|e| StreamError::Unknown(anyhow!(e)))?                     .connect().await                 {                     Ok(c) => c,                     Err(e) => {                         tracing::warn!("grpc connect error: {e}");                         let sleep = backoff_ms.min(backoff_max_ms);                         tokio::time::sleep(Duration::from_millis(sleep)).await;                         backoff_ms = ((backoff_ms as f64) * backoff_mul) as u64;                         continue 'reconnect;                     }                 };                  let (mut tx, mut rx) = match client.subscribe().await {                     Ok(x) => x,                     Err(e) => {                         tracing::warn!("subscribe call error: {e}");                         let sleep = backoff_ms.min(backoff_max_ms);                         tokio::time::sleep(Duration::from_millis(sleep)).await;                         backoff_ms = ((backoff_ms as f64) * backoff_mul) as u64;                         continue 'reconnect;                     }                 };                  if let Some(req) = ctx.desc.subscription.as_ref() {                     if let Err(e) = tx.send(req.clone()).await {                         tracing::warn!("send initial subreq: {e}");                     }                 }                  ctx.health.set(true);                  backoff_ms = 50;                  let mut tick = tokio::time::interval(Duration::from_millis(1));                 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);                  'session: loop {                     tokio::select! {                         biased;                          m = rx.next() => {                             match m {                                 Some(Ok(update)) => {                                     match update.update_oneof {                                          Some(UpdateOneof::Ping(_)) => {                                             tracing::debug!("ping update");                                             let _ = tx.send(SubscribeRequest {                                                                ping: Some(SubscribeRequestPing {id: 0}),                                                               ..Default::default()                                                              }).await;                                         }                                         Some(u) => {                                                 // А вот и хук из модели: модель сама решает, что делать                                                 hook(&GeyserEvent::Raw(u), &mut ctx.event_tx, &ctx.state);                                         }                                         None => {}                                     }                                 }                                 Some(Err(status)) => {                                     tracing::warn!("grpc stream error: {status}");                                     ctx.health.set(false);                                     break 'session;                                 }                                 None => {                                     tracing::warn!("grpc stream ended");                                     ctx.health.set(false);                                     break 'session;                                 }                             }                         }                          _ = tick.tick() => {                             let mut sent = 0usize;                              Проверим команды из модели                             while let Ok(a) = ctx.action_rx.try_recv() {                                 match a {                                     GeyserAction::Subscribe(req) => {                                         match tx.send(req).await {                                             Ok(_) => {                                                 sent += 1;                                                 if sent >= 4096 { break; }                                             }                                             Err(e) => {                                                 tracing::warn!("subreq send failed: {e}");                                                 break 'session;                                             }                                         }                                     },                                     GeyserAction::UnsubscribeAll => {}                                 }                             }                         }                     }                     if ctx.cancel.is_cancelled() { break 'reconnect Ok(()); }                 }                  let sleep = backoff_ms.min(backoff_max_ms);                 tokio::time::sleep(Duration::from_millis(sleep)).await;                 backoff_ms = ((backoff_ms as f64) * backoff_mul) as u64;             }         })?;          Ok(())     } } 

Ну что, делаем SolanaTxStream для отправки лидерам?

Только зарегистрированные пользователи могут участвовать в опросе. Войдите, пожалуйста.

Развиваем тему?

66.67%Да2
33.33%Нет1
0%Если только будут HFT плюшки0

Проголосовали 3 пользователя. Воздержались 2 пользователя.

ссылка на оригинал статьи https://habr.com/ru/articles/939966/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *