Привет, Хабр! Сегодня поговорим о lock-free (или же без использования блокировок) структурах данных и атомарных операциях в Rust.
Каждый lock может стать узким местом, тормозящим всю систему. Базовые методы синхронизации, типо мьютексов и семафор, частенько (но не всегда) снижают производительность из-за блокировок и контекстных переключений.
lock-free структуры данных позволяют нескольким потокам одновременно читать и изменять данные без блокировок.
Основы lock-free программирования в Rust
Rust дает безопасность многопоточной работы благодаря своей модели владения и системе типов:
-
Модель владения
-
Каждый ресурс в Rust имеет владельца, и только один поток может быть владельцем ресурса в конкретный момент времени. Это предотвращает возможность конкурентного доступа к одному и тому же ресурсу без синхронизации.
-
Жизненные циклы контролят время жизни ссылок, предотвращая использование недействительных ссылок.
-
-
Типы и проверка на этапе компиляции
-
Типаж
Send
: этот типаж указывает, что тип данных может быть безопасно передан между потоками. -
Типаж
Sync
: указывает, что тип данных может быть безопасно доступен из нескольких потоков одновременно.
-
Rust проверяет правильность использования типажей Send
и Sync
на этапе компиляции.
use std::sync::Arc; use std::thread; fn main() { let data = Arc::new(vec![1, 2, 3]); let mut handles = vec![]; for _ in 0..3 { let data = Arc::clone(&data); let handle = thread::spawn(move || { println!("{:?}", data); }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } }
Здесь юзаем Arc
для безопасного разделения данных между потоками. Типаж Send
позволяет перемещать Arc
между потоками, а Sync
дает безопасный доступ к данным.
Атомарные типы данных в Rust позволяют выполнять операции над данными без блокировок.
Основные атомарные типы данных в стандартной библиотеке Rust включают:
-
AtomicBool
-
Атомарная булевая переменная.
-
Пример:
use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; let flag = AtomicBool::new(false); let handle = thread::spawn(move || { flag.store(true, Ordering::Relaxed); }); handle.join().unwrap(); assert_eq!(flag.load(Ordering::Relaxed), true);
-
-
AtomicIsize и AtomicUsize
-
Атомарные целые числа (signed и unsigned).
-
Пример:
use std::sync::atomic::{AtomicUsize, Ordering}; use std::thread; let counter = AtomicUsize::new(0); let handles: Vec<_> = (0..10).map(|_| { let counter = &counter; thread::spawn(move || { for _ in 0..1000 { counter.fetch_add(1, Ordering::SeqCst); } }) }).collect(); for handle in handles { handle.join().unwrap(); } assert_eq!(counter.load(Ordering::SeqCst), 10000);
-
-
AtomicPtr
-
Атомарный указатель.
-
Пример:
use std::sync::atomic::{AtomicPtr, Ordering}; use std::ptr; let mut data = 5; let atomic_ptr = AtomicPtr::new(&mut data); let new_data = 10; atomic_ptr.store(&mut new_data, Ordering::SeqCst); assert_eq!(unsafe { *atomic_ptr.load(Ordering::SeqCst) }, 10);
-
Атомарные операции обеспечивают безопасность и целостность данных без необходимости использования блокировок!
Crossbeam для lock-free структур данных
Библиотека crossbeam предназначена для упрощения многопоточного программирования и высокопроизводительных безопасных структур данных lock-free.
Основные элементы Crossbeam:
-
Структуры данных: многопоточные структуры данных, такие как очереди и деки.
-
Epoch-based garbage collection: механизм управления памятью, который минимизирует блокировки при сборе мусора.
-
Concurrency primitives: синхронизационные примитивы для упрощения написания конкурентного кода.
ArrayQueue
ArrayQueue представляет собой lock-free очередь, реализованную на массиве с фикс. размером. Эта очередь идеально подходит для сценариев, где известно макс. количество элементов, которое нужно хранить. Пример:
use crossbeam_queue::ArrayQueue; use std::sync::Arc; use std::thread; fn main() { let queue = Arc::new(ArrayQueue::new(100)); let mut handles = vec![]; for i in 0..5 { let queue = Arc::clone(&queue); handles.push(thread::spawn(move || { for j in 0..20 { queue.push(i * 20 + j).unwrap(); } })); } for handle in handles { handle.join().unwrap(); } while let Ok(val) = queue.pop() { println!("{}", val); } }
Несколько потоков добавляют элементы в ArrayQueue, а затем элементы извлекаются из очереди.
SegQueue
SegQueue — это lock-free очередь с динамическим расширением, которая подходит для сценариев с неопределенным количеством элементов. Пример:
use crossbeam_queue::SegQueue; use std::sync::Arc; use std::thread; fn main() { let queue = Arc::new(SegQueue::new()); let mut handles = vec![]; for i in 0..5 { let queue = Arc::clone(&queue); handles.push(thread::spawn(move || { for j in 0..20 { queue.push(i * 20 + j); } })); } for handle in handles { handle.join().unwrap(); } while let Some(val) = queue.pop() { println!("{}", val); } }
SegQueue используется аналогично ArrayQueue, но не ограничена фиксированным размером.
epoch-based garbage collection
Epoch-based garbage collection юзают для управления памятью без блокировок. Основная фича состоит в том, чтобы разделить время на epochs и отслеживать, когда каждый поток находится в активном состоянии.
Из чего состоит epoch-based GC:
-
Global epoch counter: глобальный счетчик эпох, который отслеживает текущую эпоху.
-
Thread-local epoch counter: локальный для потока счетчик эпох, который обновляется при каждом входе в критическую секцию.
-
Garbage lists: списки мусора для каждой epoch, в которые добавляются освобождаемые объекты.
Пример:
use crossbeam_epoch as epoch; use std::sync::atomic::{AtomicPtr, Ordering}; use std::ptr; struct Node { value: i32, next: AtomicPtr<Node>, } fn main() { let n1 = Box::into_raw(Box::new(Node { value: 1, next: AtomicPtr::new(ptr::null_mut()), })); let n2 = Box::into_raw(Box::new(Node { value: 2, next: AtomicPtr::new(n1), })); let head = AtomicPtr::new(n2); epoch::pin(|scope| { let h = head.load(Ordering::Relaxed, scope); unsafe { if !h.is_null() { println!("Value: {}", (*h).value); } } }); unsafe { drop(Box::from_raw(n1)); drop(Box::from_raw(n2)); } }
Создали простую lock-free структуру данных и юзаем epoch-based GC для управления памятью.
Примеры использования
Многопоточная обработка событий в серверной системе
В серверных приложениях нужно обрабатывать множество событий, поступающих от различных клиентов. С lock-free очередями можно избежать задержек, связанных с блокировками:
use crossbeam_queue::SegQueue; use std::sync::Arc; use std::thread; use std::time::Duration; struct Event { id: u32, payload: String, } fn main() { let event_queue = Arc::new(SegQueue::new()); let mut handles = vec![]; // поток, добавляющий события в очередь let producer_queue = Arc::clone(&event_queue); handles.push(thread::spawn(move || { for i in 0..100 { let event = Event { id: i, payload: format!("Event {}", i), }; producer_queue.push(event); thread::sleep(Duration::from_millis(10)); } })); // поток, обрабатывающий события из очереди let consumer_queue = Arc::clone(&event_queue); handles.push(thread::spawn(move || { while let Some(event) = consumer_queue.pop() { println!("Processing event id: {}", event.id); thread::sleep(Duration::from_millis(20)); } })); for handle in handles { handle.join().unwrap(); } }
Один поток добавляет события в lock-free очередь SegQueue, а другой поток извлекает и обрабатывает их.
Пул потоков для веб-сервера
Пул потоков в основном юзают в веб-серверах для обработки запросов клиентов. lock-free здесь тоже находят свое применение:
use crossbeam_queue::SegQueue; use std::sync::{Arc, Mutex}; use std::thread; use std::sync::mpsc::{self, Sender}; struct ThreadPool { workers: Vec<Worker>, sender: Sender<Job>, } type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { fn new(size: usize) -> ThreadPool { let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } struct Worker { id: usize, thread: Option<thread::JoinHandle<()>>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {} got a job; executing.", id); job(); }); Worker { id, thread: Some(thread), } } } fn main() { let pool = ThreadPool::new(4); for i in 0..8 { pool.execute(move || { println!("Processing request {}", i); }); } }
Пул потоков использует канал mpsc для передачи задач.
Concurrent Hash Map
С Concurrent Hash Map можно круто кэшировать данные, разделяемые между потоками, без необходимости блокировок:
use std::sync::Arc; use dashmap::DashMap; use std::thread; fn main() { let cache = Arc::new(DashMap::new()); let mut handles = vec![]; // потоки, добавляющие данные в кэш for i in 0..5 { let cache = Arc::clone(&cache); handles.push(thread::spawn(move || { for j in 0..10 { cache.insert(i * 10 + j, format!("Value {}", j)); } })); } // потоки, читающие данные из кэша for i in 0..5 { let cache = Arc::clone(&cache); handles.push(thread::spawn(move || { for j in 0..10 { if let Some(value) = cache.get(&(i * 10 + j)) { println!("Read from cache: {}", value); } } })); } for handle in handles { handle.join().unwrap(); } }
Юзаем DashMap, которая обеспечивает lock-free операции для вставки и чтения данных.
На моем опыте было замечено, что с lock-free структурами данных можно сократить время выполнения задач на 30-50% по сравнению с использованием мьютексов.
Больше про языки программирования эксперты OTUS рассказывают в рамках практических онлайн-курсов. С полным каталогом курсов можно ознакомиться по ссылке.
ссылка на оригинал статьи https://habr.com/ru/articles/821629/
Добавить комментарий