Тестирование в Apache Spark Structured Streaming

от автора

Введение

На текущий момент не так много примеров тестов для приложений на основе Spark Structured Streaming. Поэтому в данной статье приводятся базовые примеры тестов с подробным описанием.

Все примеры используют: Apache Spark 3.0.1.

Подготовка

Необходимо установить:

  • Apache Spark 3.0.x
  • Python 3.7 и виртуальное окружение для него
  • Conda 4.y
  • scikit-learn 0.22.z
  • Maven 3.v
  • В примерах для Scala используется версия 2.12.10.

  1. Загрузить Apache Spark
  2. Распаковать: tar -xvzf ./spark-3.0.1-bin-hadoop2.7.tgz
  3. Создать окружение, к примеру, с помощью conda: conda create -n sp python=3.7

Необходимо настроить переменные среды. Здесь приведен пример для локального запуска.

SPARK_HOME=/Users/$USER/Documents/spark/spark-3.0.1-bin-hadoop2.7 PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip;

Тесты

Пример с scikit-learn

При написании тестов необходимо разделять код таким образом, чтобы можно было изолировать логику и реальное применение конечного API. Хороший пример изоляции: DataFrame-pandas, DataFrame-spark.

Для написания тестов будет использоваться следующий пример: LinearRegression.

Итак, пусть код для тестирования использует следующий "шаблон" для Python:

class XService:      def __init__(self):         # Инициализация      def train(self, ds):         # Обучение      def predict(self, ds):         # Предсказание и вывод результатов 

Для Scala шаблон выглядит соответственно.

Полный пример:

from sklearn import linear_model  class LocalService:      def __init__(self):         self.model = linear_model.LinearRegression()      def train(self, ds):         X, y = ds         self.model.fit(X, y)      def predict(self, ds):         r = self.model.predict(ds)         print(r) 

Тест.

Импорт:

import unittest import numpy as np

Основной класс:

class RunTest(unittest.TestCase):

Запуск тестов:

if __name__ == "__main__":     unittest.main()

Подготовка данных:

X = np.array([     [1, 1],  # 6     [1, 2],  # 8     [2, 2],  # 9     [2, 3]  # 11 ]) y = np.dot(X, np.array([1, 2])) + 3  # [ 6  8  9 11], y = 1 * x_0 + 2 * x_1 + 3

Создание модели и обучение:

service = local_service.LocalService() service.train((X, y))

Получение результатов:

service.predict(np.array([[3, 5]])) service.predict(np.array([[4, 6]]))

Ответ:

[16.] [19.]

Все вместе:

import unittest import numpy as np  from spark_streaming_pp import local_service  class RunTest(unittest.TestCase):     def test_run(self):         # Prepare data.         X = np.array([             [1, 1],  # 6             [1, 2],  # 8             [2, 2],  # 9             [2, 3]  # 11         ])         y = np.dot(X, np.array([1, 2])) + 3  # [ 6  8  9 11], y = 1 * x_0 + 2 * x_1 + 3         # Create model and train.         service = local_service.LocalService()         service.train((X, y))         # Predict and results.         service.predict(np.array([[3, 5]]))         service.predict(np.array([[4, 6]]))         # [16.]         # [19.]  if __name__ == "__main__":     unittest.main()

Пример с Spark и Python

Будет использован аналогичный алгоритм – LinearRegression. Нужно отметить, что Structured Streaming основан на тех же DataFrame-х, которые используются и в Spark Sql. Но как обычно есть нюансы.

Инициализация:

self.service = LinearRegression(maxIter=10, regParam=0.01) self.model = None

Обучение:

self.model = self.service.fit(ds)

Получение результатов:

transformed_ds = self.model.transform(ds) q = transformed_ds.select("label", "prediction").writeStream.format("console").start() return q

Все вместе:

from pyspark.ml.regression import LinearRegression  class StructuredStreamingService:     def __init__(self):         self.service = LinearRegression(maxIter=10, regParam=0.01)         self.model = None      def train(self, ds):         self.model = self.service.fit(ds)      def predict(self, ds):         transformed_ds = self.model.transform(ds)         q = transformed_ds.select("label", "prediction").writeStream.format("console").start()         return q 

Сам тест.

Обычно в тестах можно использовать данные, которые создаются прямо в тестах.

train_ds = spark.createDataFrame([     (6.0, Vectors.dense([1.0, 1.0])),     (8.0, Vectors.dense([1.0, 2.0])),     (9.0, Vectors.dense([2.0, 2.0])),     (11.0, Vectors.dense([2.0, 3.0])) ],     ["label", "features"] )

Это очень удобно и код получается компактным.

Но подобный код, к сожалению, не будет работать в Structured Streaming, т.к. созданный DataFrame не будет обладать нужными свойствами, хотя и будет соответствовать контракту DataFrame.
На текущий момент для создания источников для тестов можно использовать такой же подход, что и в тестах для Spark.

def test_stream_read_options_overwrite(self):     bad_schema = StructType([StructField("test", IntegerType(), False)])     schema = StructType([StructField("data", StringType(), False)])     df = self.spark.readStream.format('csv').option('path', 'python/test_support/sql/fake') \         .schema(bad_schema)\         .load(path='python/test_support/sql/streaming', schema=schema, format='text')     self.assertTrue(df.isStreaming)     self.assertEqual(df.schema.simpleString(), "struct<data:string>")

И так.

Создается контекст для работы:

spark = SparkSession.builder.enableHiveSupport().getOrCreate() spark.sparkContext.setLogLevel("ERROR")

Подготовка данных для обучения (можно сделать обычным способом):

train_ds = spark.createDataFrame([     (6.0, Vectors.dense([1.0, 1.0])),     (8.0, Vectors.dense([1.0, 2.0])),     (9.0, Vectors.dense([2.0, 2.0])),     (11.0, Vectors.dense([2.0, 3.0])) ],     ["label", "features"] )

Обучение:

service = structure_streaming_service.StructuredStreamingService() service.train(train_ds)

Получение результатов. Для начала считываем данные из файла и выделяем: признаки и идентификатор для объектов. После запускаем предсказание с ожиданием в 3 секунды.

def extract_features(x):     values = x.split(",")     features_ = []     for i in values[1:]:         features_.append(float(i))     features = Vectors.dense(features_)     return features  extract_features_udf = udf(extract_features, VectorUDT())  def extract_label(x):     values = x.split(",")     label = float(values[0])     return label  extract_label_udf = udf(extract_label, FloatType())  predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load() \     .withColumn("features", extract_features_udf(col("value"))) \     .withColumn("label", extract_label_udf(col("value")))  service.predict(predict_ds).awaitTermination(3)

Ответ:

15.96699 18.96138

Все вместе:

import unittest import warnings  from pyspark.sql import SparkSession from pyspark.sql.functions import col, udf from pyspark.sql.types import FloatType from pyspark.ml.linalg import Vectors, VectorUDT  from spark_streaming_pp import structure_streaming_service  class RunTest(unittest.TestCase):     def test_run(self):         spark = SparkSession.builder.enableHiveSupport().getOrCreate()         spark.sparkContext.setLogLevel("ERROR")         # Prepare data.         train_ds = spark.createDataFrame([             (6.0, Vectors.dense([1.0, 1.0])),             (8.0, Vectors.dense([1.0, 2.0])),             (9.0, Vectors.dense([2.0, 2.0])),             (11.0, Vectors.dense([2.0, 3.0]))         ],             ["label", "features"]         )         # Create model and train.         service = structure_streaming_service.StructuredStreamingService()         service.train(train_ds)          # Predict and results.          def extract_features(x):             values = x.split(",")             features_ = []             for i in values[1:]:                 features_.append(float(i))             features = Vectors.dense(features_)             return features          extract_features_udf = udf(extract_features, VectorUDT())          def extract_label(x):             values = x.split(",")             label = float(values[0])             return label          extract_label_udf = udf(extract_label, FloatType())          predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load() \             .withColumn("features", extract_features_udf(col("value"))) \             .withColumn("label", extract_label_udf(col("value")))          service.predict(predict_ds).awaitTermination(3)          # +-----+------------------+         # |label|        prediction|         # +-----+------------------+         # |  1.0|15.966990887541273|         # |  2.0|18.961384020443553|         # +-----+------------------+      def setUp(self):         warnings.filterwarnings("ignore", category=ResourceWarning)         warnings.filterwarnings("ignore", category=DeprecationWarning)  if __name__ == "__main__":     unittest.main()

Нужно отметить, что для Scala можно воспользоваться созданием потока в памяти.
Это может выглядеть вот так:

implicit val sqlCtx = spark.sqlContext import spark.implicits._ val source = MemoryStream[Record] source.addData(Record(1.0, Vectors.dense(3.0, 5.0))) source.addData(Record(2.0, Vectors.dense(4.0, 6.0))) val predictDs = source.toDF() service.predict(predictDs).awaitTermination(2000)

Полный пример на Scala (здесь, для разнообразия, не используется sql):

package aaa.abc.dd.spark_streaming_pr.cluster  import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions.udf import org.apache.spark.sql.streaming.StreamingQuery  class StructuredStreamingService {   var service: LinearRegression = _   var model: LinearRegressionModel = _    def train(ds: DataFrame): Unit = {     service = new LinearRegression().setMaxIter(10).setRegParam(0.01)     model = service.fit(ds)   }    def predict(ds: DataFrame): StreamingQuery = {      val m = ds.sparkSession.sparkContext.broadcast(model)      def transformFun(features: org.apache.spark.ml.linalg.Vector): Double = {       m.value.predict(features)     }      val transform: org.apache.spark.ml.linalg.Vector => Double = transformFun      val toUpperUdf = udf(transform)      val predictionDs = ds.withColumn("prediction", toUpperUdf(ds("features")))      predictionDs       .writeStream       .foreachBatch((r: DataFrame, i: Long) => {         r.show()         // scalastyle:off println         println(s"$i")         // scalastyle:on println       })       .start()   } }

Тест:

package aaa.abc.dd.spark_streaming_pr.cluster  import org.apache.spark.ml.linalg.Vectors import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.MemoryStream import org.scalatest.{Matchers, Outcome, fixture}  class StructuredStreamingServiceSuite extends fixture.FunSuite with Matchers {   test("run") { spark =>     // Prepare data.     val trainDs = spark.createDataFrame(Seq(       (6.0, Vectors.dense(1.0, 1.0)),       (8.0, Vectors.dense(1.0, 2.0)),       (9.0, Vectors.dense(2.0, 2.0)),       (11.0, Vectors.dense(2.0, 3.0))     )).toDF("label", "features")     // Create model and train.     val service = new StructuredStreamingService()     service.train(trainDs)     // Predict and results.     implicit val sqlCtx = spark.sqlContext     import spark.implicits._     val source = MemoryStream[Record]     source.addData(Record(1.0, Vectors.dense(3.0, 5.0)))     source.addData(Record(2.0, Vectors.dense(4.0, 6.0)))     val predictDs = source.toDF()     service.predict(predictDs).awaitTermination(2000)     // +-----+---------+------------------+     // |label| features|        prediction|     // +-----+---------+------------------+     // |  1.0|[3.0,5.0]|15.966990887541273|     // |  2.0|[4.0,6.0]|18.961384020443553|     // +-----+---------+------------------+   }    override protected def withFixture(test: OneArgTest): Outcome = {     val spark = SparkSession.builder().master("local[2]").getOrCreate()      try withFixture(test.toNoArgTest(spark))      finally spark.stop()   }    override type FixtureParam = SparkSession    case class Record(label: Double, features: org.apache.spark.ml.linalg.Vector)  }

Выводы

При написании тестов необходимо разделять код таким образом, чтобы разделять логику и применение конкретных вызовов API. Можно использоваться любые доступные источники. В том числе и kafka.

Такие абстракции как “DataFrame” позволяют это сделать легко и просто.

При использовании Python данные придется хранить в файлах.

Ссылки и ресурсы

ссылка на оригинал статьи https://habr.com/ru/post/535938/