Основы фреймворка Combine для ML в Swift

от автора

Привет, Хабр!

Combine — это фреймворк, представленный Apple в 2019 году, предназначенный для работы с асинхронными потоками данных на платформе Swift. Основная задача Combine заключается в том, чтобы упростить и унифицировать обработку асинхронных событий, таких как сетевые запросы, таймеры, уведомления и пользовательские действия.

Фреймворк является частью экосистемы Swift и доступен начиная с iOS 13, macOS 10.15, watchOS 6.0 и tvOS 13. Если вы работаете с более ранними версиями, обновление Xcode до версии 11 или выше автоматом предоставит вам доступ к Combine.

В этой статье мы рассмотрим основы этого замечательного фреймворка.

После установки Xcode вы сможете использовать Combine в своих проектах, импортировав его следующим образом:

import Combine

Создание и комбинирование потоков данных с помощью Combine

В Combine всё начинается с Publisher — сущности, которая эмитирует значения с течением времени. Это может быть что угодно: массив чисел, результат сетевого запроса, или события пользовательского интерфейса.

Простейший пример Publisher — это массив, преобразованный в Publisher с помощью .publisher:

import Combine  let numbers = [1, 2, 3, 4, 5].publisher  numbers.sink { value in     print("Received value: \(value)") }

Массив numbers превращается в Publisher, который выдает каждое число последовательно. Метод sink — это Subscriber, который «подписывается» на Publisher и выводит каждый полученный элемент.

Если хочется создать собственный Publisher, можно использовать PassthroughSubject или CurrentValueSubject:

import Combine  let subject = PassthroughSubject<String, Never>()  subject.sink { value in     print("Received value: \(value)") }  subject.send("Hello") subject.send("Combine")

Здесь PassthroughSubject позволяет вручную выдавать значения, используя метод send, можно контролировать, когда и какие данные будут отправлены подписчикам.

Когда дело доходит до объединения данных из разных источников, есть несколько операторов: merge, combineLatest и zip.

  • merge объединяет несколько Publisher-ов в один, эмитируя значения по мере их поступления:

import Combine  let publisher1 = PassthroughSubject<Int, Never>() let publisher2 = PassthroughSubject<Int, Never>()  let merged = publisher1.merge(with: publisher2)  merged.sink { value in     print("Merged value: \(value)") }  publisher1.send(1) publisher2.send(2) publisher1.send(3)

Результат работы merge будет последовательность: 1, 2, 3, т.е. значения от обоих Publisher-ов объединяются в один поток.

  • combineLatest выдает пары значений от обоих Publisher-ов, когда оба источника данных предоставили хотя бы одно значение:

import Combine  let publisherA = PassthroughSubject<String, Never>() let publisherB = PassthroughSubject<String, Never>()  let combined = publisherA.combineLatest(publisherB)  combined.sink { value in     print("Combined value: \(value)") }  publisherA.send("Hello") publisherB.send("World") // Вывод: "Combined value: ("Hello", "World")" publisherA.send("Hi") // Вывод: "Combined value: ("Hi", "World")"

combineLatest ждет данные от обоих Publisher-ов и затем выдает их в виде пары, каждый раз, когда поступает новое значение от одного из них.

  • zip работает подобно combineLatest, но выдает пары только тогда, когда обе стороны готовы передать по одному значению:

import Combine  let pub1 = PassthroughSubject<Int, Never>() let pub2 = PassthroughSubject<String, Never>()  let zipped = pub1.zip(pub2)  zipped.sink { value in     print("Zipped value: \(value)") }  pub1.send(1) pub2.send("One") // Вывод: "Zipped value: (1, "One")" pub1.send(2) pub2.send("Two") // Вывод: "Zipped value: (2, "Two")"

Этот оператор хорош, когда необходимо синхронизировать данные из разных источников.

Допустим, есть приложение, которое принимает пользовательский ввод и результат предсказания модели, и вы хочется комбинировать эти данные для отображения их в UI:

import Combine  // Создаем два Publisher-а: один для ввода пользователя, другой для предсказаний let userInput = PassthroughSubject<String, Never>() let prediction = PassthroughSubject<String, Never>()  // Комбинируем данные, используя combineLatest let combinedData = userInput.combineLatest(prediction)  combinedData.sink { user, predicted in     print("User input: \(user), Model prediction: \(predicted)") }  // Эмитируем значения userInput.send("User says: Hello") prediction.send("Model predicts: Hi") // Вывод: "User input: User says: Hello, Model prediction: Hi"

combineLatest позволяет связать ввод пользователя с результатами работы ML-модели.

Трансформация данных в потоках

Оператор map является основным инструментом для трансформации данных в Combine. Он позволяет взять данные, выдаваемые Publisher-ом, и преобразовать их в новую форму, изменяя каждый элемент потока.

Представим, что есть поток чисел, и хочется умножить каждое из них на 2. Это легко реализуется с помощью map:

import Combine  let numbers = [1, 2, 3, 4, 5].publisher  let multipliedNumbers = numbers.map { $0 * 2 }  multipliedNumbers.sink { value in     print("Transformed value: \(value)") }

Результатом выполнения этого кода будет вывод:

Transformed value: 2 Transformed value: 4 Transformed value: 6 Transformed value: 8 Transformed value: 10

Оператор map подходит для предобработки данных перед их передачей в ML модель. Например, если нужно нормализовать входные данные перед подачей их в модель:

let rawInputs = [0.5, 0.75, 1.0, 1.25].publisher  let normalizedInputs = rawInputs.map { $0 / 2.0 }  normalizedInputs.sink { value in     print("Normalized value: \(value)") }

Оператор filter позволяет отфильтровывать значения, пропуская только те, которые удовлетворяют определенным условиям.

Рассмотрим пример, где нужно оставить в потоке только четные числа:

let numbers = [1, 2, 3, 4, 5, 6].publisher  let evenNumbers = numbers.filter { $0 % 2 == 0 }  evenNumbers.sink { value in     print("Filtered value: \(value)") }

Вывод:

Filtered value: 2 Filtered value: 4 Filtered value: 6

В контексте ML, filter может быть полезен для удаления выбросов или данных, не соответствующих определенным критериям. Например, если нужно исключить из входных данных все значения, превышающие допустимый предел:

let rawInputs = [0.5, 2.5, 1.0, 3.5].publisher  let validInputs = rawInputs.filter { $0 <= 2.0 }  validInputs.sink { value in     print("Valid input: \(value)") }

flatMap — это оператор, который раскрывает новый Publisher на основе каждого значения, выдаваемое исходным Publisher-ом.

Рассмотрим пример с сетевым запросом, который возвращает данные на основе пользовательского ввода:

import Foundation  let userInput = PassthroughSubject<String, Never>() let searchResults = userInput.flatMap { query in     URLSession.shared.dataTaskPublisher(for: URL(string: "https://api.example.com/search?q=\(query)")!)         .map { $0.data }         .decode(type: [String].self, decoder: JSONDecoder())         .catch { _ in Just([]) }  // Обрабатываем возможные ошибки }  searchResults.sink { results in     print("Search results: \(results)") }  userInput.send("apple")

flatMap используется для запуска сетевого запроса на основе пользовательского ввода. Каждый раз, когда пользователь вводит новый запрос, flatMap раскрывает новый Publisher, который затем обрабатывает результаты и возвращает их.

Примеры использования

Предположим, есть поток сырых данных, которые нужно нормализовать и отфильтровать перед подачей в модель:

let rawInputs = [1.5, 0.9, 2.5, 3.7, 1.8].publisher  let processedInputs = rawInputs     .filter { $0 < 3.0 } // Оставляем только допустимые значения     .map { $0 / 3.0 } // Нормализуем данные  processedInputs.sink { value in     print("Processed input: \(value)") }

После того как модель выдала свои предсказания, возможно, потребуется постобработка данных, чтобы они стали пригодными для отображения в UI или дальнейшей аналитики:

let modelPredictions = [0.2, 0.8, 1.0, 0.4].publisher  let filteredPredictions = modelPredictions     .filter { $0 > 0.5 } // Оставляем только уверенные предсказания     .map { $0 * 100.0 } // Преобразуем в проценты  filteredPredictions.sink { value in     print("Filtered prediction: \(value)%") }

Управление ошибками и отложенная обработка данных

В Combine ошибки обрабатываются с помощью операторов catch и retry.

Оператор catch позволяет перехватывать ошибки, возникающие в потоке данных, и предоставлять альтернативный Publisher для продолжения работы.

Пример:

import Combine  struct MyError: Error {}  let faultyPublisher = Fail<Int, MyError>(error: MyError())  let safePublisher = faultyPublisher     .catch { _ in Just(0) } // В случае ошибки возвращаем значение 0     .sink { value in         print("Received value: \(value)")     }  // Вывод: "Received value: 0"

ИспользуемFail для эмуляции ошибки, а затем обработали ее с помощью catch, который возвращает безопасное значение.

Оператор retry позволяет автоматически повторять выполнение потока данных, если в нем произошла ошибка.

Пример:

import Combine  var attemptCount = 0  let retryingPublisher = Deferred {     attemptCount += 1     return attemptCount < 3 ? Fail(error: URLError(.badServerResponse)) : Just("Success") } .retry(3) // Повторяем до трех раз .sink(     receiveCompletion: { completion in         switch completion {         case .finished:             print("Completed successfully")         case .failure(let error):             print("Failed with error: \(error)")         }     },     receiveValue: { value in         print("Received value: \(value)")     } )  // Вывод: "Received value: Success"

Здесь retry пытается выполнить поток данных до трех раз. На третьей попытке поток завершается успешно.

Работа с асинхронными данными часто требует управления частотой обработки событий, особенно если поток данных поступает быстро и может перегрузить систему. Combine предоставляет два оператора для этой задачи: debounce и throttle.

debounce — это оператор, который откладывает обработку событий до тех пор, пока не пройдет определенное время без поступления новых данных.

Пример:

import Combine import Foundation  let searchTextPublisher = PassthroughSubject<String, Never>()  let debouncedSearch = searchTextPublisher     .debounce(for: .milliseconds(500), scheduler: RunLoop.main)     .sink { value in         print("Search query: \(value)")     }  searchTextPublisher.send("S") searchTextPublisher.send("Sw") searchTextPublisher.send("Swi") searchTextPublisher.send("Swif") searchTextPublisher.send("Swift") // Через 500 мс после последнего ввода: "Search query: Swift"

Здесь debounce откладывает отправку данных до тех пор, пока пользователь не закончит ввод текста, избегая излишних запросов.

throttle позволяет пропускать события только через определенные интервалы времени, ограничивая частоту их обработки.

Пример:

import Combine import Foundation  let eventPublisher = PassthroughSubject<Void, Never>()  let throttledEvents = eventPublisher     .throttle(for: .seconds(1), scheduler: RunLoop.main, latest: true)     .sink {         print("Event received")     }  for _ in 1...5 {     eventPublisher.send()     Thread.sleep(forTimeInterval: 0.3) // Эмуляция частого срабатывания события }  // Вывод: "Event received" будет напечатано только один раз в секунду

throttle в этом примере пропускает только одно событие в секунду, игнорируя остальные, что позволяет эффективно управлять нагрузкой.

Рассмотрим сценарий, где идет работа с потоком данных из нескольких источников, например, с сенсоров и сервера, и нужно управлять ошибками и задержками:

import Combine import Foundation  enum SensorError: Error {     case sensorFailure }  let sensorDataPublisher = PassthroughSubject<Int, SensorError>() let serverDataPublisher = PassthroughSubject<Int, URLError>()  let combinedPublisher = Publishers.CombineLatest(sensorDataPublisher, serverDataPublisher)     .retry(2) // Пытаемся заново в случае ошибки     .debounce(for: .milliseconds(300), scheduler: RunLoop.main) // Избегаем частой обработки данных     .catch { _ in Just((0, 0)) } // В случае ошибки возвращаем безопасное значение     .sink { sensorData, serverData in         print("Sensor: \(sensorData), Server: \(serverData)")     }  sensorDataPublisher.send(100) serverDataPublisher.send(200)  // Эмулируем ошибку сенсора sensorDataPublisher.send(completion: .failure(.sensorFailure))  serverDataPublisher.send(300)

Используем CombineLatest для объединения данных от сенсоров и сервера, обрабатываем ошибки с помощью retry, избегаем частой обработки данных с помощью debounce, и, наконец, безопасно обрабатываем ошибки с помощью catch.

Подробнее с библиотекой Combine можно ознакомиться здесь.


Актуальна тема подготовки данных в Pandas? Приходите на открытый урок 19 августа, который пройдет в рамках специализации Machine Learning.

В результате вебинара вы узнаете о методах подготовки данных и научитесь чистить данные при помощи библиотеки Pandas. Записывайтесь на урок по ссылке.


ссылка на оригинал статьи https://habr.com/ru/articles/835832/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *