Сегодня решил собрать воедино то, что я знаю о разработке высокочастотных систем в связке с Rust. Из кусков кода, по сусекам, что называется, по репозиториям наскреб и склеил с помощью готопоты достойную к вашему вниманию либу. Библа позволит сохранить время всем, кто стремиться одновременно и к скорости, и гибкости. Планирую сам активно юзать, чтобы перейти со старой асинхронной торговой инфры в истинные треды с закосом под ультра. И развивать либу под брендом TitanRt. А если сообществу зайдет, так вообще мотивации прибавиться.
Итак, TitanRt — typed reactive runtime для построения реактивных, низколатентных систем на Rust.
Если упростить: это минималистичная основа для приложений, которые живут в цикле событий, где важны:
-
миллисекунды (а иногда и наносекунды)
-
предсказуемая обратная нагрузка (back-pressure)
-
чёткий контроль жизненного цикла модели и её воркеров
-
максимальная гибкость в области более высокоуровневой разработки
-
возможность прибить всё красиво и быстро, а не висеть на zombie-тредах
Зачем ещё один рантайм?
Rust уже имеет Tokio, Actix и прочих асинхронных гигантов. Но они решают другую задачу — высокоуровневый async/await, HTTP-сервисы, очереди.
TitanRt сделан для систем реального времени:
-
торговые движки (HFT/market making)
-
анализ рыночных потоков
-
телеметрия и алертинг
-
системы с жёстким контролем задержки
То есть там, где:
-
нельзя позволить себе лишний аллокатор
-
хочется контролировать ядро CPU, на котором работает поток
-
нужны строго типизированные каналы между моделью и воркерами
Архитектура
Вместо гигантского фреймворка TitanRt — это всего пара простых идей:
-
Model-first:
Ваша бизнес-логика =BaseModel.
Она сама создаёт коннекторы и стримы, сама ими управляет, хотя не детерминирует это поведение. Вы можете управлять извне через Control Plane слой. -
Connector / Stream layer:
Коннектор = фабрика стримов.
Стрим = воркер-тред с типизированными каналами и вкуснятиной в виду StateCell<T>, где под капотом arc-swap. Со стримом можно общаться разными способами: через ваш типизированный Action, который вы отправляете через ваш типизированный Tx<Action>, а так же через типизированный hook, в который приходит сырое событие стрима, ваш Rx<Event> и StateCell<T> — здесь вы выбираете, что делать с данными, чтобы оркестрировать этим цикле модели. -
Runtime:
Небольшой управляющий поток, который гоняет команды:Start,Stop,Restart,HotReload,Shutdown. Ну и саму модель, на которой вызывается ваш импл execute() или event() — если вы вдруг гоняете события извне рантайма.
В общем и целом, рантайм не знает про ваши протоколы и бизнес-логику — только управляет жизненным циклом модели.
Не совсем детальная, но какая-никакая схема:
┌──────────────────────────┐ │ Runtime │ │ Start/Stop/Restart/... │ └───────────▲─────────────┘ │ ┌─────────┴─────────┐ │ Model │ │ owns connectors │ └─────────┬─────────┘ │ ┌───────────▼───────────┐ │ Connector(s) │ │ spawn Stream(s) │ └───────────┬───────────┘ │ Actions ───►│ │◄─── Events ▼ Stream
Как это выглядит в коде
Минимальная модель:
use titanrt::model::{BaseModel, ExecutionResult, StopKind, StopState}; use titanrt::utils::CancelToken; use anyhow::Result; #[derive(Clone, Debug)] struct MyConfig { greeting: String, } #[derive(Clone, Debug)] struct MyEvent(String); #[derive(Clone)] struct MyOutputTx; // no-op impl titanrt::io::base::BaseTx for MyOutputTx { type EventType = String; fn try_send(&mut self, v: String) -> Result<(), titanrt::error::SendError<String>> { println!("OUT: {v}"); Ok(()) } fn send( &mut self, v: String, _: &CancelToken, _: Option<std::time::Duration>, ) -> Result<(), titanrt::error::SendError<String>> { self.try_send(v) } } struct MyModel { cfg: MyConfig, out: MyOutputTx, } impl BaseModel for MyModel { type Config = MyConfig; type OutputTx = MyOutputTx; type Event = MyEvent; type Ctx = (); fn initialize( _ctx: (), config: MyConfig, _core_id: Option<usize>, output_tx: MyOutputTx, _cancel: CancelToken, ) -> Result<Self> { Ok(Self { cfg: config, out: output_tx }) } fn execute(&mut self) -> ExecutionResult { let _ = self.out.try_send(format!("{}!", self.cfg.greeting)); ExecutionResult::Relax } fn on_event(&mut self, event: MyEvent) { let _ = self.out.try_send(format!("event: {}", event.0)); } fn stop(&mut self, _kind: StopKind) -> StopState { StopState::Done } }
А потом — просто запускаем:
fn main() -> Result<()> { let cfg = titanrt::config::RuntimeConfig { init_model_on_start: true, core_id: None, max_inputs_pending: Some(1024), max_inputs_drain: Some(64), stop_model_timeout: Some(5), }; let rt = titanrt::runtime::Runtime::<MyModel>::spawn( cfg, NullModelCtx, MyConfig { greeting: "Hello, TitanRt".into() }, MyOutputTx, )?; rt.run_blocking()?; Ok(()) }
Чем это отличается от Tokio?
-
Без async/await: здесь честные треды, без скрытого планировщика. Тем не менее, планируется добавить опциональный токио рантайм из коробки.
-
CPU pinning: можно закрепить поток за ядром. Хочется в будущих версиях добавить настройку планировщика и приоритетность.
-
Typed I/O: никакого
Any, всё компилируется статически. Строго. -
Минимум зависимостей:
crossbeam,arc-swap,ahash, ringbuf,немного serde.
Когда использовать TitanRt
-
Нужно максимально предсказуемое поведение под нагрузкой.
-
Ваш код живёт в бесконечном цикле: обработка стакана, телеметрии, сигналов.
-
Вы пишете HFT-движок, рынок данных, RT-алертинг или даже какую-нибудь игровую сетевую петлю.
А если нужен HTTP, gRPC, база и веб — берите Tokio/Actix, TitanRt не про это.
Репозиторий и документация
Итог
TitanRt — это скелет для реактивных систем, где важна латентность и контроль.
Вы пишете модель и коннекторы → модель создаёт коннекторы и стримы → стримы гоняют события → рантайм следит за жизненным циклом.
Просто, предсказуемо и типобезопасно.
Плюшка в виде простого yellowstone-grpc стрима:
pub struct CompositeConnector { pub(crate) config: CompositeConfig, pub(crate) cancel_token: CancelToken, pub(crate) core_stats: Option<Arc<CoreStats>>, } impl BaseConnector for CompositeConnector { type Config = CompositeConfig; fn init( config: Self::Config, cancel_token: CancelToken, reserved_core_ids: Option<Vec<usize>>, ) -> anyhow::Result<Self> { let core_stats = if config.with_core_stats { Some(CoreStats::new( config.default_max_cores, config.specific_cores.clone(), reserved_core_ids.unwrap_or_default(), )?) } else { None }; Ok(Self { config, cancel_token, core_stats, }) } fn name(&self) -> impl AsRef<str> + Display { "CompositeConnector" } fn config(&self) -> &Self::Config { &self.config } fn cancel_token(&self) -> &CancelToken { &self.cancel_token } fn cores_stats(&self) -> Option<Arc<CoreStats>> { self.core_stats.clone() } } #[derive(Clone)] pub enum GeyserAction { Subscribe(SubscribeRequest), UnsubscribeAll, } #[derive(Clone, Debug)] pub struct GeyserGrpcDescriptor { pub endpoint: String, pub auth_token: Option<String>, pub max_pending_actions: Option<usize>, pub max_pending_events: Option<usize>, pub core_pick_policy: CorePickPolicy, pub subscription: Option<SubscribeRequest>, } impl GeyserGrpcDescriptor { pub fn new( endpoint: String, auth_token: Option<String>, max_pending_actions: Option<usize>, max_pending_events: Option<usize>, core_pick_policy: CorePickPolicy, ) -> Self { Self { endpoint, auth_token, max_pending_actions, max_pending_events, core_pick_policy, subscription: None, } } pub fn with_subscription(mut self, sub: SubscribeRequest) -> Self { self.subscription = Some(sub); self } } impl StreamDescriptor for GeyserGrpcDescriptor { fn venue(&self) -> impl Venue { Venues::Solana } fn kind(&self) -> impl Kind { "YellowstoneGrpc" } fn max_pending_actions(&self) -> Option<usize> { self.max_pending_actions } fn max_pending_events(&self) -> Option<usize> { self.max_pending_events } fn core_pick_policy(&self) -> Option<CorePickPolicy> { Some(self.core_pick_policy) } fn health_at_start(&self) -> bool { false } } #[derive(Clone, Debug)] pub enum GeyserEvent { Raw(UpdateOneof), } impl<E, S> StreamSpawner<GeyserGrpcDescriptor, E, S> for CompositeConector where S: StateMarker, E: BaseTx + TxPairExt, { } impl<E, S> StreamRunner<GeyserGrpcDescriptor, E, S> for CompositeConector where S: StateMarker, E: BaseTx, { type Config = (); type ActionTx = RingSender<GeyserAction>; type RawEvent = GeyserEvent; type Hook = fn(&Self::RawEvent, &mut E, &StateCell<S>); fn build_config(&mut self, _desc: &GeyserGrpcDescriptor) -> anyhow::Result<Self::Config> { Ok(()) } fn run( mut ctx: RuntimeCtx<GeyserGrpcDescriptor, Self, E, S>, hook: Self::Hook, ) -> StreamResult<()> { // Однопоточный рантайм ТОЛЬКО внутри run (внешний API остаётся синхронным) let rt = Builder::new_current_thread() .enable_time() .enable_io() .build() .map_err(|e| StreamError::Unknown(anyhow!(e)))?; let mut rng = SmallRng::from_os_rng(); let mut backoff_ms: u64 = 50; let backoff_max_ms: u64 = 5_000; let backoff_mul: f64 = 1.7; if !ctx.desc.health_at_start() { ctx.health.set(true); } rt.block_on(async move { 'reconnect: loop { if ctx.cancel.is_cancelled() { break Ok(()); } if backoff_ms > 50 { let j = rng.random_range(0..(backoff_ms / 5 + 1)); tokio::time::sleep(Duration::from_millis(j)).await; } let mut client = match GeyserGrpcClient::build_from_shared(ctx.desc.endpoint.clone()) .map_err(|e| StreamError::Unknown(anyhow!(e)))? .x_token(ctx.desc.auth_token.clone()) .map_err(|e| StreamError::Unknown(anyhow!(e)))? .tls_config(ClientTlsConfig::new().with_native_roots()) .map_err(|e| StreamError::Unknown(anyhow!(e)))? .connect().await { Ok(c) => c, Err(e) => { tracing::warn!("grpc connect error: {e}"); let sleep = backoff_ms.min(backoff_max_ms); tokio::time::sleep(Duration::from_millis(sleep)).await; backoff_ms = ((backoff_ms as f64) * backoff_mul) as u64; continue 'reconnect; } }; let (mut tx, mut rx) = match client.subscribe().await { Ok(x) => x, Err(e) => { tracing::warn!("subscribe call error: {e}"); let sleep = backoff_ms.min(backoff_max_ms); tokio::time::sleep(Duration::from_millis(sleep)).await; backoff_ms = ((backoff_ms as f64) * backoff_mul) as u64; continue 'reconnect; } }; if let Some(req) = ctx.desc.subscription.as_ref() { if let Err(e) = tx.send(req.clone()).await { tracing::warn!("send initial subreq: {e}"); } } ctx.health.set(true); backoff_ms = 50; let mut tick = tokio::time::interval(Duration::from_millis(1)); tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 'session: loop { tokio::select! { biased; m = rx.next() => { match m { Some(Ok(update)) => { match update.update_oneof { Some(UpdateOneof::Ping(_)) => { tracing::debug!("ping update"); let _ = tx.send(SubscribeRequest { ping: Some(SubscribeRequestPing {id: 0}), ..Default::default() }).await; } Some(u) => { // А вот и хук из модели: модель сама решает, что делать hook(&GeyserEvent::Raw(u), &mut ctx.event_tx, &ctx.state); } None => {} } } Some(Err(status)) => { tracing::warn!("grpc stream error: {status}"); ctx.health.set(false); break 'session; } None => { tracing::warn!("grpc stream ended"); ctx.health.set(false); break 'session; } } } _ = tick.tick() => { let mut sent = 0usize; Проверим команды из модели while let Ok(a) = ctx.action_rx.try_recv() { match a { GeyserAction::Subscribe(req) => { match tx.send(req).await { Ok(_) => { sent += 1; if sent >= 4096 { break; } } Err(e) => { tracing::warn!("subreq send failed: {e}"); break 'session; } } }, GeyserAction::UnsubscribeAll => {} } } } } if ctx.cancel.is_cancelled() { break 'reconnect Ok(()); } } let sleep = backoff_ms.min(backoff_max_ms); tokio::time::sleep(Duration::from_millis(sleep)).await; backoff_ms = ((backoff_ms as f64) * backoff_mul) as u64; } })?; Ok(()) } }
Ну что, делаем SolanaTxStream для отправки лидерам?
ссылка на оригинал статьи https://habr.com/ru/articles/939966/
Добавить комментарий