Что ж, да, вы не ослышались — прямо сейчас мы с вами подготовим свой движок для поддержания такого бэкенда, как у CoinMarketCup (кмк). И писать будем на моем любимом Rust. Использовать под капотом будем мою либу TitanRt, которую я лениво и скомкано презентовал в предыдущем посте. Постараюсь быть полезным и последовательным.
Осторожно: много кода (с комментариями)…
Зависимости
[package] name = "token_scanner" version = "0.1.0" edition = "2024" [dependencies] titanrt = { version = "0.3.0", features = ["reqwest_conn"] } # serde = { version = "1.0.219", features = ["derive"] } anyhow = "1.0.99" tracing-subscriber = "0.3.20" tracing = "0.1.41" config = "0.15.15"
Заготовка main.rs
fn main() -> anyhow::Result<()> { tracing_subscriber::fmt().with_max_level(Level::INFO).init(); // подключаем банальный трейсинг let cfg = RuntimeConfig { init_model_on_start: true, // запускаем модель сразу до начала лупа (не посылаем команду Start) core_id: Some(1), // пиним поток рантайма к ядру max_inputs_pending: Some(128), // до скольких элементов можно забить очередь, если не успеваем обработать (в рантайм можно отправлять входные события) max_inputs_drain: None, // сколько максимум дрейним на итерации входных событий (если None, то max_inputs_pending) stop_model_timeout: Some(5), // максимальное время ожидания модели для остановки внутри лупа }; // вычисляем путь к файлу с конфигом let model_cfg_path = if let Some(path) = env::args().nth(1) { path } else { "./model_cfg".to_string() }; // загружаем конфиг tracing::info!("Loading model config from: {}", model_cfg_path); let model_cfg = load_model_cfg(model_cfg_path)?; // модель будет отсылать события, которые мы будем прослушивать и сами предпринимать действия // создаем канал взаимодействия между моделью и текущим потоком let (output_tx, mut output_rx) = MpmcChannel::unbounded::<Output<OuputEvent>>(); // пока нам не нужен никакой контекст, все берем из конфига // юзаем удобный маркер из либы let model_ctx = NullModelCtx; // запускаем отдельный поток с рантаймом, где будет крутиться обслуживающий busy-loop и дергаться модель let rt = Runtime::<TokenScannerModel>::spawn(cfg, model_ctx, model_cfg, output_tx)?; let mut outputs_count = 0; // принимаем от модели события и просто принтим их (здесь может быть сложная логика, например, для взаимодействия с бэком) loop { if outputs_count > 1000 { break Ok(()); } while let Ok(output) = output_rx.try_recv() { tracing::info!("{:?}", output); outputs_count += 1; } // просто держим поток в течение 100 мс std::thread::sleep(std::time::Duration::from_millis(100)); } } // вспомогательная функция для загрузки конфига fn load_model_cfg(path: String) -> anyhow::Result<TokenScannerConfig> { let cfg = Config::builder() .add_source(config::File::from(PathBuf::from(&path))) .build() .with_context(|| format!("failed to read model config from {path}"))?; let model: TokenScannerConfig = cfg .try_deserialize() .with_context(|| format!("failed to deserialize model config from {path}"))?; Ok(model) }
Предварительно реализуем саму модель
pub struct TokenScannerModel {} impl BaseModel for TokenScannerModel { type Config = TokenScannerConfig; // тип конфига type OutputTx = MpmcSender<Output<OuputEvent>>; // указываем имплементацию трансмиссии type OutputEvent = OuputEvent; // событие трансмиссии type Event = NullEvent; // не шлем события в модель type Ctx = NullModelCtx; // не используем контекст fn initialize( ctx: Self::Ctx, config: Self::Config, reserved_core_id: Option<usize>, output_tx: Self::OutputTx, cancel_token: CancelToken, ) -> anyhow::Result<Self> { todo!() } fn execute(&mut self) -> ExecutionResult { todo!() } // не шлем в модель извне никаких событий (могли бы через объект рантайма) fn on_event(&mut self, event: Self::Event, meta: Option<InputMeta>) { unimplemented!() } // останавливаемся сразу fn stop(&mut self, kind: StopKind) -> StopState { // StopState::InProgress StopState::Done } // не релоадим конфиги на лету fn hot_reload(&mut self, config: &Self::Config) -> anyhow::Result<()> { unimplemented!() } }
Что делаем во время инициализации модели
Нам нужен стрим, который будет в отдельном потоке принимать команды от модели и асинхронно отправлять запросы на биржи, чтобы фетчить данные.
В версии 0.3.0 я добавил такой универсальный http-коннектор с подобным стримом на базе reqwest. Работает в одном треде в цикле в такой схеме:
-
В начале итерации проверяет очередь команд, если не пустая, то создает фоновые токио таски через local_spawn, внутри которой идет проверка встроенного рейт лимит менеджера, затем уже отправка запроса на сервер. После ожидания ответа отправляем через mcmp канал ответ в наш основной цикл.
-
Идет проверка на готовые ответы. Если ответы имеются, то вызываем модельный хук и передаем все нужное, в том числе сырой ответ.
-
Даем токио рантайму немного продышаться и выполнить таски.
-
Конец итерации
Просто возьмем из titanrt уже готовый коннектор и заспавним стрим прямо в модели во время инита:
pub struct TokenScannerModel { reqwest_stream: Stream<RingSender<ReqwestAction>, RingReceiver<CryptoEvent>, NullState>, // держим стрим для отправки http запросов и вызова хука output_tx: MpmcSender<Output<OuputEvent>>, // держим отправителя для отправки внутренний событий в вызывающий код } impl BaseModel for TokenScannerModel { ... fn initialize( _ctx: Self::Ctx, config: Self::Config, reserved_core_id: Option<usize>, output_tx: Self::OutputTx, cancel_token: CancelToken, ) -> anyhow::Result<Self> { // инитим коннектор (могли бы и закрепить его в структуре, чтобы можно было всегда заново создать стрим или еще один) let mut conn = ReqwestConnector::init( config.reqwest_conn, // вынесем конфиг коннектора в модельный конфиг cancel_token.new_child(), // модельный токен будет родителем, если отменим стрим, то модель останется reserved_core_id.map(|c| vec![c]), // укажем зарезервированное ядро )?; // для ясности: // let mut desc = ReqwestStreamDescriptor { // max_hook_calls_at_once: 10, // максимальное количество вызовов хуков в одной итерации (по сути, сколько дреним ответов за раз) // wait_async_tasks_us: 0, // сколько даем поспать токио рантайма - основное место для джиттера // max_pending_actions: None, // максимальное количество команд, которые стрим держит необработанными в очереди // max_pending_events: None, // максимальное количество событий, которые модель держит необработанными в очереди // core_pick_policy: Some(CorePickPolicy::Specific(2)), // привяжем поток стрима на отдельное ядро процессора // rate_limits: config.reqwest_rate_limits.clone(), // }; // короче: let mut desc = ReqwestStreamDescriptor::low_latency(); for rl_cfg in config.reqwest_rate_limits.iter() { desc.add_rate_limit(rl_cfg.clone()); } // наконец, спавним стрим в отдельном потоке через коннектор // передаем объем дескриптора и кастомный модельный hook, реализуемый в другом месте let reqwest_stream = conn.spawn_stream(desc, crypto_hook)?; // ждем пока стрим не запуститься // например, могли накосячить с конфигом и стрим не запустился let deadline = Instant::now() + Duration::from_secs(10); while !reqwest_stream.is_healthy() { if Instant::now() >= deadline { return Err(anyhow::anyhow!( "stream didn't become healthy within 10 seconds", )); } sleep(Duration::from_secs(1)); } Ok(Self { output_tx, reqwest_stream, }) } }
Конфигурацию мы оставим на потом, пока просто примем во внимание, что она есть.
Как сейчас выглядит crypto_hook
#[derive(Debug, Clone)] pub struct CryptoEvent {} pub fn crypto_hook( args: HookArgs< ReqwestEvent, // сырое событие, которое детерминируется на уровне реализации стрима RingSender<CryptoEvent>, // будем отправлять из хука спарсенные события NullReducer, // можете реализовать на случай, если нужен стейт внутри стрима NullState, // это если нужен стейт, который будете лоадить в модели ReqwestStreamDescriptor, // из-за этого имеем доступ к нашему дескриптору внутри хука >, ) { if args.raw.is_success() { } else { } }
Скоро мы это исправим, но прежде мы должны понять, что мы вообще делаем.
Бизнес логика — execute()
А что мы вообще делаем, собственно? Титан райнтайм запустит модель и будет шарашить model.execute() миллионы раз в секунду. Чем толще и массивнее execute, тем меньше итераций в лупе рантайме в секунду. И это основное место, где реализуется бизнес логика модели (или торговая, если хотите).
Очевидно, на экзекьюте нам нужно реализовать логику отправки команд в наш стрим, а также дренить (высушивать) наш ресивер, через который мы отправляем спарсенные ответы в хуке.
Давайте подготовим минимальный плацдарм просто для наглядности:
impl BaseModel for TokenScannerModel { ... fn execute(&mut self) -> ExecutionResult { let events = self.reqwest_stream.drain_max(); // полностью осушаем ресивер for event in events { tracing::debug!("got event: {:?}", event); } if self.reqwest_stream.is_healthy() { // экшены детерминируются реализацией стрима, поэтому юзаем то, что нам дают (или сами реализуем стрим) // здесь имеется в виду, что мы билдим GET запрос, передаем рл контекст, чтобы можно было отыскать конфигурацию рейт лимитера let action = ReqwestAction::get( Url::parse( "https://api.bybit.com/v5/market/orderbook?category=spot&symbol=BTCUSDT", ) .unwrap(), ) .rl_ctx(RateLimitContext::new("bybit")) .build(); // и просто отправляем экшен в стрим, чтобы в ответ на него получить событие в нашем хуке match self.reqwest_stream.try_send(action) { Ok(()) => {} Err(e) => { tracing::error!("error sending reqwest action: {}", e); } } } ExecutionResult::Relax // чтобы не жечь ядро и чутка тормозить busy-loop } }
Естественно, это не просто минимальный плацдарм, а скорее пустышка в демонстрационных целях, как можно за минуту реализовать получение снэпшотов ордербука с bybit внутри execute. Если мы это запустим, то даже событий никаких не будет, так как у нас пустой хук. Тем не менее, коллеги, это точка роста и начало программистской мысли.
В следующей части
-
Реализуем OutputEvent — событие, которое шлет модель время от времени в вызывающий код, например, для бэкенда. Это может быть снимок состояния токенов, которые мы сканируем. Или дельты по конкретному токену для сохранения в базу.
-
Реализуем CryptoEvent — событие, в которое парсим сырой ответ из хука. Это будет событие с req_id, а так же внутренним enum, чтобы не ограничиваться только запросами снэпшотов стакана, а использовать разные данные с бирж, в том числе Ticker Info. И сделаем его унифицированным, чтобы не ограничиваться только биржей байбит или рынком спот.
-
Реализуем хук и парсинг сырых событий с разных источников. Возьмем 2: bybit + binance.
-
Реализуем тайминги внутри execute и дополнительные структуры для отправки экшенов. Так же поймем, что будем делать с объектами CryptoEvent и в какой редьюсер их собирать, чтобы затем высылать output_tx.
-
Расширим конфигурацию модели и обсудим ее подробнее.
ссылка на оригинал статьи https://habr.com/ru/articles/944308/
Добавить комментарий