Введение
На текущий момент не так много примеров тестов для приложений на основе Spark Structured Streaming. Поэтому в данной статье приводятся базовые примеры тестов с подробным описанием.
Все примеры используют: Apache Spark 3.0.1.
Подготовка
Необходимо установить:
- Apache Spark 3.0.x
- Python 3.7 и виртуальное окружение для него
- Conda 4.y
- scikit-learn 0.22.z
- Maven 3.v
- В примерах для Scala используется версия 2.12.10.
- Загрузить Apache Spark
- Распаковать: tar -xvzf ./spark-3.0.1-bin-hadoop2.7.tgz
- Создать окружение, к примеру, с помощью conda: conda create -n sp python=3.7
Необходимо настроить переменные среды. Здесь приведен пример для локального запуска.
SPARK_HOME=/Users/$USER/Documents/spark/spark-3.0.1-bin-hadoop2.7 PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip;
Тесты
Пример с scikit-learn
При написании тестов необходимо разделять код таким образом, чтобы можно было изолировать логику и реальное применение конечного API. Хороший пример изоляции: DataFrame-pandas, DataFrame-spark.
Для написания тестов будет использоваться следующий пример: LinearRegression.
Итак, пусть код для тестирования использует следующий "шаблон" для Python:
class XService: def __init__(self): # Инициализация def train(self, ds): # Обучение def predict(self, ds): # Предсказание и вывод результатов
Для Scala шаблон выглядит соответственно.
Полный пример:
from sklearn import linear_model class LocalService: def __init__(self): self.model = linear_model.LinearRegression() def train(self, ds): X, y = ds self.model.fit(X, y) def predict(self, ds): r = self.model.predict(ds) print(r)
Тест.
Импорт:
import unittest import numpy as np
Основной класс:
class RunTest(unittest.TestCase):
Запуск тестов:
if __name__ == "__main__": unittest.main()
Подготовка данных:
X = np.array([ [1, 1], # 6 [1, 2], # 8 [2, 2], # 9 [2, 3] # 11 ]) y = np.dot(X, np.array([1, 2])) + 3 # [ 6 8 9 11], y = 1 * x_0 + 2 * x_1 + 3
Создание модели и обучение:
service = local_service.LocalService() service.train((X, y))
Получение результатов:
service.predict(np.array([[3, 5]])) service.predict(np.array([[4, 6]]))
Ответ:
[16.] [19.]
Все вместе:
import unittest import numpy as np from spark_streaming_pp import local_service class RunTest(unittest.TestCase): def test_run(self): # Prepare data. X = np.array([ [1, 1], # 6 [1, 2], # 8 [2, 2], # 9 [2, 3] # 11 ]) y = np.dot(X, np.array([1, 2])) + 3 # [ 6 8 9 11], y = 1 * x_0 + 2 * x_1 + 3 # Create model and train. service = local_service.LocalService() service.train((X, y)) # Predict and results. service.predict(np.array([[3, 5]])) service.predict(np.array([[4, 6]])) # [16.] # [19.] if __name__ == "__main__": unittest.main()
Пример с Spark и Python
Будет использован аналогичный алгоритм – LinearRegression. Нужно отметить, что Structured Streaming основан на тех же DataFrame-х, которые используются и в Spark Sql. Но как обычно есть нюансы.
Инициализация:
self.service = LinearRegression(maxIter=10, regParam=0.01) self.model = None
Обучение:
self.model = self.service.fit(ds)
Получение результатов:
transformed_ds = self.model.transform(ds) q = transformed_ds.select("label", "prediction").writeStream.format("console").start() return q
Все вместе:
from pyspark.ml.regression import LinearRegression class StructuredStreamingService: def __init__(self): self.service = LinearRegression(maxIter=10, regParam=0.01) self.model = None def train(self, ds): self.model = self.service.fit(ds) def predict(self, ds): transformed_ds = self.model.transform(ds) q = transformed_ds.select("label", "prediction").writeStream.format("console").start() return q
Сам тест.
Обычно в тестах можно использовать данные, которые создаются прямо в тестах.
train_ds = spark.createDataFrame([ (6.0, Vectors.dense([1.0, 1.0])), (8.0, Vectors.dense([1.0, 2.0])), (9.0, Vectors.dense([2.0, 2.0])), (11.0, Vectors.dense([2.0, 3.0])) ], ["label", "features"] )
Это очень удобно и код получается компактным.
Но подобный код, к сожалению, не будет работать в Structured Streaming, т.к. созданный DataFrame не будет обладать нужными свойствами, хотя и будет соответствовать контракту DataFrame.
На текущий момент для создания источников для тестов можно использовать такой же подход, что и в тестах для Spark.
def test_stream_read_options_overwrite(self): bad_schema = StructType([StructField("test", IntegerType(), False)]) schema = StructType([StructField("data", StringType(), False)]) df = self.spark.readStream.format('csv').option('path', 'python/test_support/sql/fake') \ .schema(bad_schema)\ .load(path='python/test_support/sql/streaming', schema=schema, format='text') self.assertTrue(df.isStreaming) self.assertEqual(df.schema.simpleString(), "struct<data:string>")
И так.
Создается контекст для работы:
spark = SparkSession.builder.enableHiveSupport().getOrCreate() spark.sparkContext.setLogLevel("ERROR")
Подготовка данных для обучения (можно сделать обычным способом):
train_ds = spark.createDataFrame([ (6.0, Vectors.dense([1.0, 1.0])), (8.0, Vectors.dense([1.0, 2.0])), (9.0, Vectors.dense([2.0, 2.0])), (11.0, Vectors.dense([2.0, 3.0])) ], ["label", "features"] )
Обучение:
service = structure_streaming_service.StructuredStreamingService() service.train(train_ds)
Получение результатов. Для начала считываем данные из файла и выделяем: признаки и идентификатор для объектов. После запускаем предсказание с ожиданием в 3 секунды.
def extract_features(x): values = x.split(",") features_ = [] for i in values[1:]: features_.append(float(i)) features = Vectors.dense(features_) return features extract_features_udf = udf(extract_features, VectorUDT()) def extract_label(x): values = x.split(",") label = float(values[0]) return label extract_label_udf = udf(extract_label, FloatType()) predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load() \ .withColumn("features", extract_features_udf(col("value"))) \ .withColumn("label", extract_label_udf(col("value"))) service.predict(predict_ds).awaitTermination(3)
Ответ:
15.96699 18.96138
Все вместе:
import unittest import warnings from pyspark.sql import SparkSession from pyspark.sql.functions import col, udf from pyspark.sql.types import FloatType from pyspark.ml.linalg import Vectors, VectorUDT from spark_streaming_pp import structure_streaming_service class RunTest(unittest.TestCase): def test_run(self): spark = SparkSession.builder.enableHiveSupport().getOrCreate() spark.sparkContext.setLogLevel("ERROR") # Prepare data. train_ds = spark.createDataFrame([ (6.0, Vectors.dense([1.0, 1.0])), (8.0, Vectors.dense([1.0, 2.0])), (9.0, Vectors.dense([2.0, 2.0])), (11.0, Vectors.dense([2.0, 3.0])) ], ["label", "features"] ) # Create model and train. service = structure_streaming_service.StructuredStreamingService() service.train(train_ds) # Predict and results. def extract_features(x): values = x.split(",") features_ = [] for i in values[1:]: features_.append(float(i)) features = Vectors.dense(features_) return features extract_features_udf = udf(extract_features, VectorUDT()) def extract_label(x): values = x.split(",") label = float(values[0]) return label extract_label_udf = udf(extract_label, FloatType()) predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load() \ .withColumn("features", extract_features_udf(col("value"))) \ .withColumn("label", extract_label_udf(col("value"))) service.predict(predict_ds).awaitTermination(3) # +-----+------------------+ # |label| prediction| # +-----+------------------+ # | 1.0|15.966990887541273| # | 2.0|18.961384020443553| # +-----+------------------+ def setUp(self): warnings.filterwarnings("ignore", category=ResourceWarning) warnings.filterwarnings("ignore", category=DeprecationWarning) if __name__ == "__main__": unittest.main()
Нужно отметить, что для Scala можно воспользоваться созданием потока в памяти.
Это может выглядеть вот так:
implicit val sqlCtx = spark.sqlContext import spark.implicits._ val source = MemoryStream[Record] source.addData(Record(1.0, Vectors.dense(3.0, 5.0))) source.addData(Record(2.0, Vectors.dense(4.0, 6.0))) val predictDs = source.toDF() service.predict(predictDs).awaitTermination(2000)
Полный пример на Scala (здесь, для разнообразия, не используется sql):
package aaa.abc.dd.spark_streaming_pr.cluster import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions.udf import org.apache.spark.sql.streaming.StreamingQuery class StructuredStreamingService { var service: LinearRegression = _ var model: LinearRegressionModel = _ def train(ds: DataFrame): Unit = { service = new LinearRegression().setMaxIter(10).setRegParam(0.01) model = service.fit(ds) } def predict(ds: DataFrame): StreamingQuery = { val m = ds.sparkSession.sparkContext.broadcast(model) def transformFun(features: org.apache.spark.ml.linalg.Vector): Double = { m.value.predict(features) } val transform: org.apache.spark.ml.linalg.Vector => Double = transformFun val toUpperUdf = udf(transform) val predictionDs = ds.withColumn("prediction", toUpperUdf(ds("features"))) predictionDs .writeStream .foreachBatch((r: DataFrame, i: Long) => { r.show() // scalastyle:off println println(s"$i") // scalastyle:on println }) .start() } }
Тест:
package aaa.abc.dd.spark_streaming_pr.cluster import org.apache.spark.ml.linalg.Vectors import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.MemoryStream import org.scalatest.{Matchers, Outcome, fixture} class StructuredStreamingServiceSuite extends fixture.FunSuite with Matchers { test("run") { spark => // Prepare data. val trainDs = spark.createDataFrame(Seq( (6.0, Vectors.dense(1.0, 1.0)), (8.0, Vectors.dense(1.0, 2.0)), (9.0, Vectors.dense(2.0, 2.0)), (11.0, Vectors.dense(2.0, 3.0)) )).toDF("label", "features") // Create model and train. val service = new StructuredStreamingService() service.train(trainDs) // Predict and results. implicit val sqlCtx = spark.sqlContext import spark.implicits._ val source = MemoryStream[Record] source.addData(Record(1.0, Vectors.dense(3.0, 5.0))) source.addData(Record(2.0, Vectors.dense(4.0, 6.0))) val predictDs = source.toDF() service.predict(predictDs).awaitTermination(2000) // +-----+---------+------------------+ // |label| features| prediction| // +-----+---------+------------------+ // | 1.0|[3.0,5.0]|15.966990887541273| // | 2.0|[4.0,6.0]|18.961384020443553| // +-----+---------+------------------+ } override protected def withFixture(test: OneArgTest): Outcome = { val spark = SparkSession.builder().master("local[2]").getOrCreate() try withFixture(test.toNoArgTest(spark)) finally spark.stop() } override type FixtureParam = SparkSession case class Record(label: Double, features: org.apache.spark.ml.linalg.Vector) }
Выводы
При написании тестов необходимо разделять код таким образом, чтобы разделять логику и применение конкретных вызовов API. Можно использоваться любые доступные источники. В том числе и kafka.
Такие абстракции как “DataFrame” позволяют это сделать легко и просто.
При использовании Python данные придется хранить в файлах.
Ссылки и ресурсы
ссылка на оригинал статьи https://habr.com/ru/post/535938/
Добавить комментарий