Изучаем английский с Scala на Future и Actor

от автора

Решил тут я подтянуть свой английский язык. В частности, захотелось значительно расширить словарный запас. Я знаю, что существует масса программ, которые в игровой форме помогают это сделать. Загвоздка в том, что я не люблю геймфикацию. Предпочитаю по старинке. Листочек бумаги где таблица со словами, транскрипцией и переводом. И учим его учим. И проверяем свои знания, например, закрывая столбец с переводом. В общем, как я учил это в университете.

Прослышал я про то, что существует 3000 наиболее часто используемых слов, подобранных на OxfordDictionary сайте. Вот тут этот список слов: www.oxfordlearnersdictionaries.com/wordlist/english/oxford3000/Oxford3000_A-B Ну а перевод на русский я решил брать отсюда: www.translate.ru/dictionary/en-ru Одна только проблема, все находиться на этих сайтах ну совсем не в том формате, который можно распечатать и учить. В итоге родилась идея это все запрограммировать. Но сделать это не как последовательный алгоритм, а все распаралелить. Что бы выкачивание и парсинг всех слов занял не (3000 слов * 2 сайта) / 60 секунд = 100 минут. Это если давать по 1 секунде на выкачивание и распарсивание страницы для извлечения перевода и транскрипции (в реальности думаю это в 3 раза дольше, пока соединение откроем, пока закроем и тд и тп).

image

Задачу я разбил сразу на два крупных блока. Первый блок, это операции ввода/вывода блокирующие — выкачивание страницы с сайта. Второй блок, это операции вычислительные, не блокирующие, но нагружающие CPU: парсинг страницы для извлечения перевода и транскрипции и добавление в словарь результатов парсинга.

Блокирующие операции я решил делать в пуле нитей, используя Future от Scala. Вычислительные задачи, решил раскидать на 3 актера Akka. Применяя методику TDD, cначала я написал тест к своим кирпичикам будущего приложения.

class Test extends FlatSpec with Matchers {    "Table Of Content extractor" should "download and extract content from Oxford Site" in {     val content:List[String] = OxfordSite.getTableOfContent     content.size should be (10)     content.find(_ == "A-B") should be (Some("A-B"))     content.find(_ == "U-Z") should be (Some("U-Z"))   }    "Words list extractor" should "download words from page" in {     val future: Future[Try[Option[List[String]]]] = OxfordSite.getWordsFromPage("A-B", 1)     val wordsTry:Try[Option[List[String]]] = Await.result(future,60 seconds)     wordsTry should be a 'success     val words = wordsTry.get     words.get.find(_ == "abandon") should be (Some("abandon"))    }   "Words list extractor" should "return None from empty page" in {     val future: Future[Try[Option[List[String]]]] = OxfordSite.getWordsFromPage("A-B", 999)     val wordsTry:Try[Option[List[String]]] = Await.result(future,60 seconds)     wordsTry should be a 'success     val words = wordsTry.get     words should be(None)    }    "Russian Translation" should "download translation and parse" in {     val page: Future[Try[String]] =  LingvoSite.getPage("test")     val pageResultTry: Try[String]= Await.result(page,60 seconds)     pageResultTry should be a 'success     val pageResult = pageResultTry.get     pageResult.contains("тест") should be(true)     LingvoSite.parseTranslation(pageResult).get should be("тест")   }      "English Translation" should "download translation and parse" in {     val page: Future[Try[String]] =  OxfordSite.getPage("test")     val pageResultTry: Try[String] = Await.result(page,60 seconds)     pageResultTry should be a 'success     val pageResult = pageResultTry.get     pageResult.contains("examination") should be(true)     OxfordSite.parseTranslation(pageResult).get should be(("test", "an examination of somebody’s knowledge or ability, consisting of questions for them to answer or activities for them to perform"))    }  } 

Обратите внимание. Функции, которые могут вернуть результат вычислений имеют Try[…]. Те либо Success результат или Failure и эксепшен. Функции, которые будут часто вызывать и имеют блокирующие i/o операции имеют результат, как Future[Try[…]]. Те при вызове функции сразу возвращается Future в которой идут долгие i/o операции. Притом они идут внутри Try и могут завершится с ошибок (например соединение порвалось).

Само приложение инициализируется в Top3000WordsApp.scala. Поднимается система актеров. Создаются актеры. Запускается парсинг списка слов, который в параллель запускает выкачивание английских и русских страниц с транскрипцией и переводом. В случае успешной скачки страниц, срабатывает передача содержимое страниц актерам для парсинга, извлекающих перевод и транскрипцию. Результат перевода актеры передают конечному актеру-словарю, который акамулирует все результаты в одном месте. И по нажатию enter, система актеров идет в shutdown. И актер DictionaryActor, получая сигнал об этом, сохраняет собраный словарь в файл dictionaty.txt

object Top3000WordsApp extends App {     val system = ActorSystem("Top3000Words")   val dictionatyActor = system.actorOf(Props[DictionaryActor], "dictionatyActor")   val englishTranslationActor = system.actorOf(Props(classOf[EnglishTranslationActor], dictionatyActor), "englishTranslationActor")   val russianTranslationActor = system.actorOf(Props(classOf[RussianTranslationActor], dictionatyActor), "russianTranslationActor")   val mapGetPageThreadExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(16))   val mapGetWordsThreadExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(16))     start()    scala.io.StdIn.readLine()   system.terminate()   def start() = {      import concurrent.ExecutionContext.Implicits.global      Future {       OxfordSite.getTableOfContent.par.foreach(letterGroup => {         getWords(letterGroup, 1)       })      }   }     def getWords(letterGroup: String, pageNum: Int): Unit = {     implicit val executor = mapGetWordsThreadExecutionContext      OxfordSite.getWordsFromPage(letterGroup, pageNum).map(tryWords => {       tryWords match {         case Success(Some(words)) => words.par.foreach(word => {             parse(word,letterGroup,pageNum)         })         case Success(None) => Unit         case Failure(ex) => println(ex.getMessage)       }     })    }     def parse(word: String, letterGroup: String, pageNum: Int)= {      implicit val executor = mapGetPageThreadExecutionContext     OxfordSite.getPage(word).map(tryEnglishPage => {       tryEnglishPage match {         case Success(englishPage) => {           englishTranslationActor ! (word, englishPage)           getWords(letterGroup, pageNum + 1)         }         case Failure(ex) => println(ex.getMessage)       }     })     LingvoSite.getPage(word).map(_ match {       case Success(russianPage) => {         russianTranslationActor !(word, russianPage)       }       case Failure(ex) => println(ex.getMessage)     })   }  }  

Обратите внимание, что алгоритм разбит на start, getWords, parse функции. Это сделано из-за того, что для каждой фазы задачи требуется свой пул нитей, который передается неявно, как ThreadExecutionContext. Сначала, у меня была всего одна функция getWords, для рекурсивного вызова. Но все работало очень медленно, так как на верхнем уровне алгоритма распаралеливание выжирало весь пул нитей и в самом низу были вечные ожидания, когда же мне дадут свободную нить, что бы поработать. А как раз в низу самое большое число операций.

Вот реализация скачивания и парсинга с сайтов.

object OxfordSite {    val getPageThreadExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(16))    def parseTranslation(content: String): Try[(String, String)] = {     Try {       val browser = new Browser       val doc = browser.parseString(content)       val spanElement: Element = doc >> element(".phon")       val str = Jsoup.parse(spanElement.toString).text()       val transcription = str.stripPrefix("BrE//").stripSuffix("//").trim       val translation = doc >> text(".def")       (transcription,translation)     }   }    def getPage(word: String): Future[Try[String]] = {     implicit val executor = getPageThreadExecutionContext     Future {       Try {         val html = Source.fromURL("http://www.oxfordlearnersdictionaries.com/definition/english/" + (word.replace(' ','-')) + "_1")         html.mkString       }     }   }    def getWordsFromPage(letterGroup: String, pageNum: Int): Future[Try[Option[List[String]]]] = {     import ExecutionContext.Implicits.global      Future {       Try {         val html = Source.fromURL("http://www.oxfordlearnersdictionaries.com" +           "/wordlist/english/oxford3000/Oxford3000_" + letterGroup + "/?page=" + pageNum)         val page = html.mkString         val browser = new Browser         val doc = browser.parseString(page)         val ulElement: Element = doc >> element(".wordlist-oxford3000")         val liElements: List[Element] = ulElement >> elementList("li")         if (liElements.size > 0) Some(liElements.map(_ >> text("a")))         else None       }     }   }    def getTableOfContent: List[String] = {      val html = Source.fromURL("http://www.oxfordlearnersdictionaries.com/wordlist/english/oxford3000/Oxford3000_A-B/")     val page = html.mkString     val browser = new Browser     val doc = browser.parseString(page)     val ulElement: Element = doc >> element(".hide_phone")     val liElements: List[Element] = ulElement >> elementList("li")     List(liElements.head >> text("span")) ++ liElements.tail.map(_ >> text("a"))   }  }   object LingvoSite {   val getPageThreadExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(16))     def parseTranslation(content: String): Try[String] = {     Try {        val browser = new Browser       val doc = browser.parseString(content)       val spanElement: Element = doc >> element(".r_rs")       spanElement >> text("a")     }   }    def getPage(word: String): Future[Try[String]] = {     implicit val executor = getPageThreadExecutionContext      Future {       Try {          val html = Source.fromURL("http://www.translate.ru/dictionary/en-ru/" + java.net.URLEncoder.encode(word,"UTF-8"))         html.mkString       }     }   }  }  

Структуры данных с которыми работают актеры.

case class Word (word: String, transcription: Option[String] = None, russianTranslation:Option[String] = None, englishTranslation: Option[String] = None) case class RussianTranslation(word:String, translation: String) case class EnglishTranslation(word:String, translation: String) case class Transcription(word:String, transcription: String)  

Актеры, которые принимают на входе скачанные страницы для парсинга и пересылают перевод и транскрипцию актеру DictionaryActor

class EnglishTranslationActor (dictionaryActor: ActorRef) extends Actor {   println("EnglishTranslationActor")     def receive = {     case (word: String, englishPage: String) => {       OxfordSite.parseTranslation(englishPage) match {         case Success((transcription, translation)) => {           dictionaryActor ! EnglishTranslation(word,translation)           dictionaryActor ! Transcription(word,transcription)         }         case Failure(ex) => {           println(ex.getMessage)         }       }     }   }  }   class RussianTranslationActor  (dictionaryActor: ActorRef) extends Actor {   println("RussianTranslationActor")     def receive = {     case (word: String, russianPage: String) => {       LingvoSite.parseTranslation(russianPage) match {         case Success(translation) => {             dictionaryActor ! RussianTranslation(word, translation)         }         case Failure(ex) => {           println(ex.getMessage)         }       }     }   }    }  

Актер который накапливает в себе словарь с переводами и транскрипцией и после shutdown системы актеров записывает весь словарь в dictionary.txt

class DictionaryActor extends Actor {   println("DictionaryActor")     override def postStop(): Unit = {     println("DictionaryActor postStop")     val fileText = DictionaryActor.words.map{case (_, someWord)=> {       val transcription = someWord.transcription.getOrElse(" ")       val russianTranslation = someWord.russianTranslation.getOrElse(" ")       val englishTranslation = someWord.englishTranslation.getOrElse(" ")       List(someWord.word, transcription , russianTranslation , englishTranslation).mkString("|")     }}.mkString("\n")     scala.tools.nsc.io.File("dictionary.txt").writeAll(fileText)     println("dictionary.txt saved")     System.exit(0)    }    def receive = {     case Transcription(wordName, transcription) => {       val newElement = DictionaryActor.words.get(wordName) match {         case Some(word) => word.copy(transcription = Some(transcription))         case None =>  Word(wordName,transcription = Some(transcription))       }       DictionaryActor.words += wordName -> newElement       println(newElement)     }     case RussianTranslation(wordName, translation) => {       val newElement = DictionaryActor.words.get(wordName) match {         case Some(word) => word.copy(russianTranslation = Some(translation))         case None =>  Word(wordName,russianTranslation = Some(translation))       }       DictionaryActor.words += wordName -> newElement       println(newElement)     }     case EnglishTranslation(wordName, translation) => {       val newElement = DictionaryActor.words.get(wordName) match {         case Some(word) => word.copy(englishTranslation = Some(translation))         case None =>  Word(wordName,englishTranslation = Some(translation))       }       DictionaryActor.words += wordName -> newElement       println(newElement)     }   } }   object DictionaryActor {   var words = scala.collection.mutable.Map[String, Word]() } 

Какие выводы? На моем Mac Book Pro этот скрипт работал в течение примерно 1 часа, пока я писал эту статью. Я его прервал нажав enter и вот какой результат:

bash-3.2$ cat ./dictionary.txt |wc -l     1809 

Потом, я еще раз запустил скрипт и оставил на несколько часов. Когда вернулся, то у меня был процессор загружен 100% и были ошибки в консоле про гарбаж коллектор, по нажатию на enter моя программа не смогла сохранить результат своей работы в файл. Диагноз такой, писать на Future и par.map или par.foreach, конечно красиво и удобно, но реально очень тяжело понять как на самом деле на уровне нитей это все работает и где же узкое горлышко бутылки. В итоге я планирую все переписать на актеры. Притом, буду использовать пулы актеров. Что бы, например, 4 актера выкачивало и парсило страницы со списками слов, 18 актеров выкачивало страницы с переводами, 4 актера парсило страницы извлекая переводы и транскрипцию, ну и 1 актер складывал все в словарь.

Текущая реализация в бранче v0.1 github.com/evgenyigumnov/top3000words/tree/v0.1 Версия где все переписано на актеры с пулами будет в бранче v0.2, ну и в master, чуть позже. Может есть у кого соображения, что я делал не так, в текущей версии? Ну и может советы подкините по новой версии?

Проект на гитхабе доступен: github.com/evgenyigumnov/top3000words

Запустить тесты проекта: sbt test
Запустить приложение: sbt run
Ну и как надоест ждать, нажать enter и ознакомиться с содержимым файла dectionary.txt в текущей папке

ссылка на оригинал статьи http://habrahabr.ru/post/273431/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *