Разбавляем асинхронное программирование функциональным на Scala

от автора

Приветствую! В этой статье будет показано, как, имея на руках обычные Future-ы, сделать в scala подобие корутин и асинхронные stream-ы. Этакий небольшой туториал по функциональному программированию.

Что это и зачем

Что такое Future человеческим языком

Future — это сущность, описывающая результат некоторых вычислений, который мы получим не сразу, но в будущем. Но есть одна особенность: зачастую мы, не зная еще результата, точно знаем, что мы с ним будем делать. Например, мы попросили у сервера какой-то конфиг, и теперь у нас есть Future[Config]. Сам конфиг мы еще не получили, но точно знаем, что, когда получим, то достанем из него адрес и по этому адресу попросим у сервера картинку (config => Future[Image]). И Future[Config] способна изменяться таким образом, чтобы мы вместо конфига и потом картинки могли получить сразу картинку. Сущности, способные комбинироваться таким способом, называются монадами.

К сожалению, простое последовательное комбинирование 2х и более асинхронных операций (загрузить конфиг, а потом картинку по адресу из конфига как пример) — это все, на что способны обычные Future-ы в качестве монад. Они не позволяют ни сохранять состояние, ни делать циклы из асинхронных операций, ни выдавать несколько (или бесконечно много) значений. Вот этими недостатками мы сейчас и займемся.

Давайте для определенности представим некий виджет. Он ждет конфиг, который обновляется с определенной периодичностью, загружает по адресу из конфига значение (например, температуру), и рисует на экране текущее значение, минимум, максимум, среднее и так далее. И все это делает в цикле, да еще и асинхронно.

Применив знания из этой статьи, мы сможем этот процесс описать примерно так:

Код

// Про 'FState' - далее, пока же просто примем, что это - такая необычная Future def getNextConfig: FState[Config] def getTemperature(from: String): FState[Int]  case class State(temperature: Int, sumTemp: Long, count: Int) {   def isGood = ... }  // Как видим, получается единый асинхронный алгоритм с состоянием,  // которое извне этого алгоритма не видно val handle =    while_ ( _.isGood)   {  for (         config <- getNextConfig();         if (config.isDefined);  // пустой конфиг - прекращаем выполнение         nextValue <- getTemperature(config().source);  // грузим значение температуры         state <- gets[State];  // тут мы берем текущее состояние         newState = State(nextValue, state.sumTemp + nextValue, state.count + 1);         _ <- puts(newState);  // .. и меняем его         _ <- runInUiThread { drawOnScreen(newState) }   ) yield() } 

Или вот так:

Код

val configs: AsyncStream[Config] = ... // получаем откуда-то stream конфигов  def getTemperature(from: String): FState[Int]  case class State(temperature: Int, sumTemp: Long, count: Int)  // Получается то же самое, только вместо зависимости 'getNextConfig' // мы, по сути, передаем сами данные - stream из конфигов val handle =    foreach(configs) {     config => for (         nextValue <- getTemperature(config().source);  // грузим значение температуры         state <- gets[State];  // тут мы берем текущее состояние         newState = State(nextValue, state.sumTemp + nextValue, state.count + 1);         _ <- puts(newState);  // .. и меняем его         _ <- runInUiThread { drawOnScreen(newState) }     ) yield()     } 

Всех, кто заинтересовался, прошу под кат.

Stateful asynchronous computations

Это такая Future, которая позволяет сохранять и менять состояние псевдоалгоритма внутри for-конструкции, те самые gets[State] и puts[State]. И то, что придает этому самому псевдоалгоритму некую корутинообразность.

Давайте рассмотрим вот такую вот интересную сущность:

// S - тип внешнего состояния, A - тип результата case class FState[S, +A](func: S => Future[(A, S)]) {   def apply(s: S) = func(s) } 

Как видим, это простая обертка над функцией, принимающей текущее состояние и возвращающей Future на результат в паре с новым состоянием. Вид этой сущности получен простым комбинированием монад Future и State (а то, что мы сейчас делаем, называется монадным трансформером).

Давайте научим эту сущность быть монадой. В принципе, нам достаточно определить у этой сущности операции unit и flatMap (а еще map, которая выражается через первые две), но мы пойдем сразу тернистым путем scalaz, и получим бонусом целую алгебру операций, определенную в терминах вот этих двух.

class FStateMonad[S] extends Monad[({ type f[X] = FState[S, X]})#f] {   type F[X] = FState[S, X]    override def point[A](a: => A): F[A]   override def bind[A, B](m: F[A])(f: A => F[B]): F[B] } 

ответ

class FStateMonad[S] extends Monad[({ type f[X] = FState[S, X]})#f] {   type F[X] = FState[S, X]    override def point[A](a: => A): F[A] = FState((s: S) => Future((a, s)))   override def bind[A, B](m: F[A])(f: A => F[B]): F[B] =     FState((s: S) => m(s) flatMap { pair => f(pair._1)(pair._2) }) } 

Например, мы только что бесплатно получили вот такую совершенно великолепную операцию:

// Цикл с монадным условием и монадным телом - красота! def whileM_[A](p: F[Boolean], body: => F[A]): F[Unit] 

А как же менять состояние внутри псевдоалгоритма? Мы можем заметить, что в комбинаторе bind оно не фигурирует. Пишем:

def gets[S](): FState[S, S] def puts[S](news: S): FState[S, S] 

ответ

// Как видим, чтобы нам получить текущее состояние, его достаточно  // просто "поднять" в значение. def gets[S](): FState[S, S] = FState((s: S) => Future((s, s)))  // А изменить состояние еще проще def puts[S](news: S): FState[S, S] = FState((_: S) => Future((news, news))) 

Вот, в принципе, и все! Теперь мы можем написать что-то вроде:

implicit val m = new FStateMonad[Int] // состояние - обычный Int  // И эта жуть просто считает до 10... val algo = for(    _ <- m.whileM_(gets[Int] map (_ < 10), for(     i <- gets[Int];     _ <- puts(i + 1)   ) yield(()));   v1 <- gets[Int]  ) yield (v1)  // algo(0)() should be ((10, 10)) 

Но мы можем написать себе столько синтаксического сахара, сколько захотим, и закончить примерно на этом:

implicit val m = new FStateMonad[Int] // состояние - обычный Int  val algo = for(    // почти обычный for, с условием, модификатором состояния и телом   _ <- m.forM_ ( _ < 10, _ + 1) {     // тут любой FState   };   v1 <- gets[Int]  ) yield (v1)  // algo(0)() should be ((10, 10)) 

Асинхронные stream-ы

Это как раз такая Future-а, которая способна возвращать больше одного значения асинхронно. Во втором примере в начале статьи мы просто по этому stream-у итерируемся stateful-псевдоалгоритмом. Однако со stream-ом можно делать еще несколько разных интересностей, но давайте по порядку. Начнем с построения stream-а.

Что такое stream? Упрощенно — список с хвостом, вычисляемым лениво. Вследствие этого может быть бесконечным. Наш асинхронный AsyncStream будет выглядеть примерно так:

class Pair[A, B](fp: A, sp: => B) {   val first = fp   lazy val second = sp }  object Pair {   def apply[A, B](first:  A, second: => B) = new Pair[A, B](first, second) }  // Конец stream-а обозначим как Future(null), это обязательно должна быть // какая-то конечная Future-а, возвращающая какое-то пустое значение. // Мы не будем возиться тут с Option-ом, а возьмем просто null. case class AsyncStream[A](data: Future[Pair[A, AsyncStream[A]]]) 

Итак, вроде бы все просто: асинхронно возвращаем значение и хвост, а ленивость хвоста должна уберечь нас от stackoverflow при пользовании бесконечными stream-ами.

Но stream-ы обладают одной очень притягательной особенностью: их можно сворачивать! Пишем:

// Почти стандартный foldLeft, только возвращает Future[B] // - другого мы ничего вернуть не сможем def foldLeft[B](start: B)(f: (B, A) => B): Future[B] 

ответ

def foldLeft[B](start: B)(f: (B, A) => B): Future[B] = {   // Стандартно, определим функцию с аккумулятором, которой   // потом передадим data   def impl(d: Future[Pair[A, AsyncStream[A]]], acc: Future[B]): Future[B] =     d flatMap (pair => {       if (pair eq null) acc       else impl(pair.second.data, acc map (b => f(b, pair.first)))     })     impl(data, Future(start)) } 

Можно еще вот так свернуть:

def flatten : Future[List[A]] 

ответ

def flatten : Future[List[A]] =     foldLeft[List[A]](Nil)((list, el) => el :: list) map (_.reverse) 

Еще парочка полезных функций, в основном для работы с бесконечными stream-ами:

def takeWhile(p: A => Boolean): AsyncStream[A] def take(n: Int): AsyncStream[A] 

ответ

def takeWhile(p: A => Boolean): AsyncStream[A] =   new AsyncStream[A](data map (pair => {      if (pair eq null) null      else if (!p(pair.first)) null      else Pair(pair.first, pair.second.takeWhile(p))    }))  def take(n: Int): AsyncStream[A] =     if (n <= 0) nil     else new AsyncStream[A](data map (pair => {       if (pair eq null) null       else Pair(pair.first, pair.second.take(n - 1))     })) 

Сворачивать-то мы научились, а вот строить такой stream пока что неудобно. Исправим это и напишем универсальный генератор stream-а:

// 'gen' и является функцией-генератором, она должна возвратить  // Future(null), если больше значений сгенерировать не получается def genAsyncStream[S,A](start: S)(gen: S => Future[(A, S)]): AsyncStream[A] 

ответ

def genAsyncStream[S,A](start: S)(gen: S => Future[(A, S)]): AsyncStream[A] =   new AsyncStream[A](     gen(start) match {       case _: NoFuture => Future(null)       case future => future map (pair => { // Future[Pair[A, AsyncStream]]         if (pair eq null) null         else Pair(pair._1, genAsyncStream(pair._2)(gen))       })}) 

Кстати, с помощью foldLeft + genAsyncStream stream-ы можно копировать.

Stream-ы можно соединять:

def concat[A](s1: AsyncStream[A], s2: AsyncStream[A]): AsyncStream[A] 

ответ

def concat[A](s1: AsyncStream[A], s2: AsyncStream[A]): AsyncStream[A] =   new AsyncStream[A](s1.data flatMap (pair => {     if (pair eq null) s2.data     else Future(Pair(pair.first, concat(pair.second, s2)))   })) 

А еще асинхронный stream — это тоже монада:

class AsyncStreamMonad extends Monad[AsyncStream] {   override def point[A](a: => A): AsyncStream[A]   override def bind[A, B](     ma: AsyncStream[A])(f: A => AsyncStream[B]): AsyncStream[B] } 

ответ

class AsyncStreamMonad extends Monad[AsyncStream] {   override def point[A](a: => A): AsyncStream[A] = unit(a)   override def bind[A, B](     ma: AsyncStream[A])(f: A => AsyncStream[B]): AsyncStream[B] =     new AsyncStream[B](ma.data flatMap (pair => {       if (pair eq null) Future(null)       else f(pair.first).data map (         pair2 => Pair(pair2.first, concat(pair2.second, bind(pair.second)(f))))     })) } 

В основном, построение асинхронного stream-а на этом можно закончить.

FState + AsyncStream = ?

На самом деле, они прекрасно уживаются. Давайте посмотрим на функцию-генератор в genAsyncStream, ничего не напоминает? Да это же FState!

А теперь давайте научимся итерироваться по stream-у:

  def foreach[A, S]     (stream: AsyncStream[A])(f: A => FState[S, Any]): FState[S, Unit] 

ответ

// Сделаем это foldLeft-ом def foreach[A, S]   (stream: AsyncStream[A])(f: A => FState[S, Any]): FState[S, Unit] =   FState(s => {     stream.foldLeft(Future(s))(     (futureS, a) => futureS.flatMap(s2 => f(a)(s2).map(_._2))).flatten.map( ((), _) )   }) 

То есть мы совершенно спокойно можем написать генератор, который yield-ит значения в асинхронный stream, потом этот stream куда-то передать, и по нему уже итерироваться другим алгоритмом — довольно удобно, имхо, когда надо передать данные из одного модуля программы в другой и не хочется внедрять зависимости.

Итог

Как итог, мы получили пару сущностей, расширяющие возможности обычных Future-ов. Надеюсь, было интересно.
Спасибо за внимание)

Код здесь.

ссылка на оригинал статьи https://habrahabr.ru/post/281346/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *