Lock-free структуры данных в Rust

от автора

Привет, Хабр! Сегодня поговорим о lock-free (или же без использования блокировок) структурах данных и атомарных операциях в Rust.

Каждый lock может стать узким местом, тормозящим всю систему. Базовые методы синхронизации, типо мьютексов и семафор, частенько (но не всегда) снижают производительность из-за блокировок и контекстных переключений.

lock-free структуры данных позволяют нескольким потокам одновременно читать и изменять данные без блокировок.

Основы lock-free программирования в Rust

Rust дает безопасность многопоточной работы благодаря своей модели владения и системе типов:

  1. Модель владения

    • Каждый ресурс в Rust имеет владельца, и только один поток может быть владельцем ресурса в конкретный момент времени. Это предотвращает возможность конкурентного доступа к одному и тому же ресурсу без синхронизации.

    • Жизненные циклы контролят время жизни ссылок, предотвращая использование недействительных ссылок.

  2. Типы и проверка на этапе компиляции

    • Типаж Send: этот типаж указывает, что тип данных может быть безопасно передан между потоками.

    • Типаж Sync: указывает, что тип данных может быть безопасно доступен из нескольких потоков одновременно.

Rust проверяет правильность использования типажей Send и Sync на этапе компиляции.

use std::sync::Arc; use std::thread;  fn main() {     let data = Arc::new(vec![1, 2, 3]);     let mut handles = vec![];      for _ in 0..3 {         let data = Arc::clone(&data);         let handle = thread::spawn(move || {             println!("{:?}", data);         });         handles.push(handle);     }      for handle in handles {         handle.join().unwrap();     } }

Здесь юзаем Arc для безопасного разделения данных между потоками. Типаж Send позволяет перемещать Arc между потоками, а Sync дает безопасный доступ к данным.

Атомарные типы данных в Rust позволяют выполнять операции над данными без блокировок.

Основные атомарные типы данных в стандартной библиотеке Rust включают:

  1. AtomicBool

    • Атомарная булевая переменная.

    • Пример:

      use std::sync::atomic::{AtomicBool, Ordering}; use std::thread;  let flag = AtomicBool::new(false);  let handle = thread::spawn(move || {     flag.store(true, Ordering::Relaxed); });  handle.join().unwrap(); assert_eq!(flag.load(Ordering::Relaxed), true);
  2. AtomicIsize и AtomicUsize

    • Атомарные целые числа (signed и unsigned).

    • Пример:

      use std::sync::atomic::{AtomicUsize, Ordering}; use std::thread;  let counter = AtomicUsize::new(0); let handles: Vec<_> = (0..10).map(|_| {     let counter = &counter;     thread::spawn(move || {         for _ in 0..1000 {             counter.fetch_add(1, Ordering::SeqCst);         }     }) }).collect();  for handle in handles {     handle.join().unwrap(); }  assert_eq!(counter.load(Ordering::SeqCst), 10000);
  3. AtomicPtr

    • Атомарный указатель.

    • Пример:

      use std::sync::atomic::{AtomicPtr, Ordering}; use std::ptr;  let mut data = 5; let atomic_ptr = AtomicPtr::new(&mut data);  let new_data = 10; atomic_ptr.store(&mut new_data, Ordering::SeqCst);  assert_eq!(unsafe { *atomic_ptr.load(Ordering::SeqCst) }, 10);

Атомарные операции обеспечивают безопасность и целостность данных без необходимости использования блокировок!

Crossbeam для lock-free структур данных

Библиотека crossbeam предназначена для упрощения многопоточного программирования и высокопроизводительных безопасных структур данных lock-free.

Основные элементы Crossbeam:

  • Структуры данных: многопоточные структуры данных, такие как очереди и деки.

  • Epoch-based garbage collection: механизм управления памятью, который минимизирует блокировки при сборе мусора.

  • Concurrency primitives: синхронизационные примитивы для упрощения написания конкурентного кода.

ArrayQueue

ArrayQueue представляет собой lock-free очередь, реализованную на массиве с фикс. размером. Эта очередь идеально подходит для сценариев, где известно макс. количество элементов, которое нужно хранить. Пример:

use crossbeam_queue::ArrayQueue; use std::sync::Arc; use std::thread;  fn main() {     let queue = Arc::new(ArrayQueue::new(100));     let mut handles = vec![];      for i in 0..5 {         let queue = Arc::clone(&queue);         handles.push(thread::spawn(move || {             for j in 0..20 {                 queue.push(i * 20 + j).unwrap();             }         }));     }      for handle in handles {         handle.join().unwrap();     }      while let Ok(val) = queue.pop() {         println!("{}", val);     } }

Несколько потоков добавляют элементы в ArrayQueue, а затем элементы извлекаются из очереди.

SegQueue

SegQueue — это lock-free очередь с динамическим расширением, которая подходит для сценариев с неопределенным количеством элементов. Пример:

use crossbeam_queue::SegQueue; use std::sync::Arc; use std::thread;  fn main() {     let queue = Arc::new(SegQueue::new());     let mut handles = vec![];      for i in 0..5 {         let queue = Arc::clone(&queue);         handles.push(thread::spawn(move || {             for j in 0..20 {                 queue.push(i * 20 + j);             }         }));     }      for handle in handles {         handle.join().unwrap();     }      while let Some(val) = queue.pop() {         println!("{}", val);     } }

SegQueue используется аналогично ArrayQueue, но не ограничена фиксированным размером.

epoch-based garbage collection

Epoch-based garbage collection юзают для управления памятью без блокировок. Основная фича состоит в том, чтобы разделить время на epochs и отслеживать, когда каждый поток находится в активном состоянии.

Из чего состоит epoch-based GC:

  • Global epoch counter: глобальный счетчик эпох, который отслеживает текущую эпоху.

  • Thread-local epoch counter: локальный для потока счетчик эпох, который обновляется при каждом входе в критическую секцию.

  • Garbage lists: списки мусора для каждой epoch, в которые добавляются освобождаемые объекты.

Пример:

use crossbeam_epoch as epoch; use std::sync::atomic::{AtomicPtr, Ordering}; use std::ptr;  struct Node {     value: i32,     next: AtomicPtr<Node>, }  fn main() {     let n1 = Box::into_raw(Box::new(Node {         value: 1,         next: AtomicPtr::new(ptr::null_mut()),     }));      let n2 = Box::into_raw(Box::new(Node {         value: 2,         next: AtomicPtr::new(n1),     }));      let head = AtomicPtr::new(n2);      epoch::pin(|scope| {         let h = head.load(Ordering::Relaxed, scope);         unsafe {             if !h.is_null() {                 println!("Value: {}", (*h).value);             }         }     });      unsafe {         drop(Box::from_raw(n1));         drop(Box::from_raw(n2));     } }

Создали простую lock-free структуру данных и юзаем epoch-based GC для управления памятью.

Примеры использования

Многопоточная обработка событий в серверной системе

В серверных приложениях нужно обрабатывать множество событий, поступающих от различных клиентов. С lock-free очередями можно избежать задержек, связанных с блокировками:

use crossbeam_queue::SegQueue; use std::sync::Arc; use std::thread; use std::time::Duration;  struct Event {     id: u32,     payload: String, }  fn main() {     let event_queue = Arc::new(SegQueue::new());     let mut handles = vec![];      // поток, добавляющий события в очередь     let producer_queue = Arc::clone(&event_queue);     handles.push(thread::spawn(move || {         for i in 0..100 {             let event = Event {                 id: i,                 payload: format!("Event {}", i),             };             producer_queue.push(event);             thread::sleep(Duration::from_millis(10));         }     }));      // поток, обрабатывающий события из очереди     let consumer_queue = Arc::clone(&event_queue);     handles.push(thread::spawn(move || {         while let Some(event) = consumer_queue.pop() {             println!("Processing event id: {}", event.id);             thread::sleep(Duration::from_millis(20));         }     }));      for handle in handles {         handle.join().unwrap();     } }

Один поток добавляет события в lock-free очередь SegQueue, а другой поток извлекает и обрабатывает их.

Пул потоков для веб-сервера

Пул потоков в основном юзают в веб-серверах для обработки запросов клиентов. lock-free здесь тоже находят свое применение:

use crossbeam_queue::SegQueue; use std::sync::{Arc, Mutex}; use std::thread; use std::sync::mpsc::{self, Sender};  struct ThreadPool {     workers: Vec<Worker>,     sender: Sender<Job>, }  type Job = Box<dyn FnOnce() + Send + 'static>;  impl ThreadPool {     fn new(size: usize) -> ThreadPool {         let (sender, receiver) = mpsc::channel();         let receiver = Arc::new(Mutex::new(receiver));         let mut workers = Vec::with_capacity(size);          for id in 0..size {             workers.push(Worker::new(id, Arc::clone(&receiver)));         }          ThreadPool { workers, sender }     }      fn execute<F>(&self, f: F)     where         F: FnOnce() + Send + 'static,     {         let job = Box::new(f);         self.sender.send(job).unwrap();     } }  struct Worker {     id: usize,     thread: Option<thread::JoinHandle<()>>, }  impl Worker {     fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {         let thread = thread::spawn(move || loop {             let job = receiver.lock().unwrap().recv().unwrap();             println!("Worker {} got a job; executing.", id);             job();         });          Worker {             id,             thread: Some(thread),         }     } }  fn main() {     let pool = ThreadPool::new(4);      for i in 0..8 {         pool.execute(move || {             println!("Processing request {}", i);         });     } }

Пул потоков использует канал mpsc для передачи задач.

Concurrent Hash Map

С Concurrent Hash Map можно круто кэшировать данные, разделяемые между потоками, без необходимости блокировок:

use std::sync::Arc; use dashmap::DashMap; use std::thread;  fn main() {     let cache = Arc::new(DashMap::new());     let mut handles = vec![];      // потоки, добавляющие данные в кэш     for i in 0..5 {         let cache = Arc::clone(&cache);         handles.push(thread::spawn(move || {             for j in 0..10 {                 cache.insert(i * 10 + j, format!("Value {}", j));             }         }));     }      // потоки, читающие данные из кэша     for i in 0..5 {         let cache = Arc::clone(&cache);         handles.push(thread::spawn(move || {             for j in 0..10 {                 if let Some(value) = cache.get(&(i * 10 + j)) {                     println!("Read from cache: {}", value);                 }             }         }));     }      for handle in handles {         handle.join().unwrap();     } }

Юзаем DashMap, которая обеспечивает lock-free операции для вставки и чтения данных.


На моем опыте было замечено, что с lock-free структурами данных можно сократить время выполнения задач на 30-50% по сравнению с использованием мьютексов.

Больше про языки программирования эксперты OTUS рассказывают в рамках практических онлайн-курсов. С полным каталогом курсов можно ознакомиться по ссылке.


ссылка на оригинал статьи https://habr.com/ru/articles/821629/