{"id":315906,"date":"2021-01-02T21:00:26","date_gmt":"2021-01-02T21:00:26","guid":{"rendered":"http:\/\/savepearlharbor.com\/?p=315906"},"modified":"-0001-11-30T00:00:00","modified_gmt":"-0001-11-29T21:00:00","slug":"","status":"publish","type":"post","link":"https:\/\/savepearlharbor.com\/?p=315906","title":{"rendered":"\u0422\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u0435 \u0432 Apache Spark Structured Streaming"},"content":{"rendered":"\n<div class=\"post__text post__text-html post__text_v1\" id=\"post-content-body\">\n<h1 id=\"vvedenie\">\u0412\u0432\u0435\u0434\u0435\u043d\u0438\u0435<\/h1>\n<p>  <\/p>\n<p>\u041d\u0430 \u0442\u0435\u043a\u0443\u0449\u0438\u0439 \u043c\u043e\u043c\u0435\u043d\u0442 \u043d\u0435 \u0442\u0430\u043a \u043c\u043d\u043e\u0433\u043e \u043f\u0440\u0438\u043c\u0435\u0440\u043e\u0432 \u0442\u0435\u0441\u0442\u043e\u0432 \u0434\u043b\u044f \u043f\u0440\u0438\u043b\u043e\u0436\u0435\u043d\u0438\u0439 \u043d\u0430 \u043e\u0441\u043d\u043e\u0432\u0435 Spark Structured Streaming. \u041f\u043e\u044d\u0442\u043e\u043c\u0443 \u0432 \u0434\u0430\u043d\u043d\u043e\u0439 \u0441\u0442\u0430\u0442\u044c\u0435 \u043f\u0440\u0438\u0432\u043e\u0434\u044f\u0442\u0441\u044f \u0431\u0430\u0437\u043e\u0432\u044b\u0435 \u043f\u0440\u0438\u043c\u0435\u0440\u044b \u0442\u0435\u0441\u0442\u043e\u0432 \u0441 \u043f\u043e\u0434\u0440\u043e\u0431\u043d\u044b\u043c \u043e\u043f\u0438\u0441\u0430\u043d\u0438\u0435\u043c. <\/p>\n<p>  <\/p>\n<p>\u0412\u0441\u0435 \u043f\u0440\u0438\u043c\u0435\u0440\u044b \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u044e\u0442: Apache Spark 3.0.1.<\/p>\n<p><a name=\"habracut\"><\/a>  <\/p>\n<h1 id=\"podgotovka\">\u041f\u043e\u0434\u0433\u043e\u0442\u043e\u0432\u043a\u0430<\/h1>\n<p>  <\/p>\n<p>\u041d\u0435\u043e\u0431\u0445\u043e\u0434\u0438\u043c\u043e \u0443\u0441\u0442\u0430\u043d\u043e\u0432\u0438\u0442\u044c: <\/p>\n<p>  <\/p>\n<ul>\n<li>Apache Spark 3.0.x<\/li>\n<li>Python 3.7 \u0438 \u0432\u0438\u0440\u0442\u0443\u0430\u043b\u044c\u043d\u043e\u0435 \u043e\u043a\u0440\u0443\u0436\u0435\u043d\u0438\u0435 \u0434\u043b\u044f \u043d\u0435\u0433\u043e<\/li>\n<li>Conda 4.y<\/li>\n<li>scikit-learn 0.22.z<\/li>\n<li>Maven 3.v<\/li>\n<li>\u0412 \u043f\u0440\u0438\u043c\u0435\u0440\u0430\u0445 \u0434\u043b\u044f Scala \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u0435\u0442\u0441\u044f \u0432\u0435\u0440\u0441\u0438\u044f 2.12.10. <\/li>\n<\/ul>\n<p>  <\/p>\n<ol>\n<li>\u0417\u0430\u0433\u0440\u0443\u0437\u0438\u0442\u044c <a href=\"https:\/\/apache-mirror.rbc.ru\/pub\/apache\/spark\/spark-3.0.1\/spark-3.0.1-bin-hadoop2.7.tgz\" rel=\"nofollow\">Apache Spark<\/a><\/li>\n<li>\u0420\u0430\u0441\u043f\u0430\u043a\u043e\u0432\u0430\u0442\u044c: tar -xvzf .\/spark-3.0.1-bin-hadoop2.7.tgz <\/li>\n<li>\u0421\u043e\u0437\u0434\u0430\u0442\u044c \u043e\u043a\u0440\u0443\u0436\u0435\u043d\u0438\u0435, \u043a \u043f\u0440\u0438\u043c\u0435\u0440\u0443, \u0441 \u043f\u043e\u043c\u043e\u0449\u044c\u044e conda: conda create -n sp python=3.7<\/li>\n<\/ol>\n<p>  <\/p>\n<p>\u041d\u0435\u043e\u0431\u0445\u043e\u0434\u0438\u043c\u043e \u043d\u0430\u0441\u0442\u0440\u043e\u0438\u0442\u044c \u043f\u0435\u0440\u0435\u043c\u0435\u043d\u043d\u044b\u0435 \u0441\u0440\u0435\u0434\u044b. \u0417\u0434\u0435\u0441\u044c \u043f\u0440\u0438\u0432\u0435\u0434\u0435\u043d \u043f\u0440\u0438\u043c\u0435\u0440 \u0434\u043b\u044f \u043b\u043e\u043a\u0430\u043b\u044c\u043d\u043e\u0433\u043e \u0437\u0430\u043f\u0443\u0441\u043a\u0430.<\/p>\n<p>  <\/p>\n<pre><code class=\"plaintext\">SPARK_HOME=\/Users\/$USER\/Documents\/spark\/spark-3.0.1-bin-hadoop2.7 PYTHONPATH=$SPARK_HOME\/python:$SPARK_HOME\/python\/lib\/py4j-0.10.9-src.zip;<\/code><\/pre>\n<p>  <\/p>\n<h1 id=\"testy\">\u0422\u0435\u0441\u0442\u044b<\/h1>\n<p>  <\/p>\n<h2 id=\"primer-s-scikit-learn\">\u041f\u0440\u0438\u043c\u0435\u0440 \u0441 scikit-learn<\/h2>\n<p>  <\/p>\n<p>\u041f\u0440\u0438 \u043d\u0430\u043f\u0438\u0441\u0430\u043d\u0438\u0438 \u0442\u0435\u0441\u0442\u043e\u0432 \u043d\u0435\u043e\u0431\u0445\u043e\u0434\u0438\u043c\u043e \u0440\u0430\u0437\u0434\u0435\u043b\u044f\u0442\u044c \u043a\u043e\u0434 \u0442\u0430\u043a\u0438\u043c \u043e\u0431\u0440\u0430\u0437\u043e\u043c, \u0447\u0442\u043e\u0431\u044b \u043c\u043e\u0436\u043d\u043e \u0431\u044b\u043b\u043e \u0438\u0437\u043e\u043b\u0438\u0440\u043e\u0432\u0430\u0442\u044c \u043b\u043e\u0433\u0438\u043a\u0443 \u0438 \u0440\u0435\u0430\u043b\u044c\u043d\u043e\u0435 \u043f\u0440\u0438\u043c\u0435\u043d\u0435\u043d\u0438\u0435 \u043a\u043e\u043d\u0435\u0447\u043d\u043e\u0433\u043e API. \u0425\u043e\u0440\u043e\u0448\u0438\u0439 \u043f\u0440\u0438\u043c\u0435\u0440 \u0438\u0437\u043e\u043b\u044f\u0446\u0438\u0438: <a href=\"https:\/\/pandas.pydata.org\/pandas-docs\/stable\/reference\/api\/pandas.DataFrame.html\" rel=\"nofollow\">DataFrame-pandas<\/a>, <a href=\"https:\/\/spark.apache.org\/docs\/latest\/sql-getting-started.html#creating-dataframes\" rel=\"nofollow\">DataFrame-spark<\/a>.<\/p>\n<p>  <\/p>\n<p>\u0414\u043b\u044f \u043d\u0430\u043f\u0438\u0441\u0430\u043d\u0438\u044f \u0442\u0435\u0441\u0442\u043e\u0432 \u0431\u0443\u0434\u0435\u0442 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c\u0441\u044f \u0441\u043b\u0435\u0434\u0443\u044e\u0449\u0438\u0439 \u043f\u0440\u0438\u043c\u0435\u0440: <a href=\"https:\/\/scikit-learn.org\/stable\/modules\/generated\/sklearn.linear_model.LinearRegression.html#sklearn.linear_model.LinearRegression\" rel=\"nofollow\">LinearRegression<\/a>.<\/p>\n<p>  <\/p>\n<p>\u0418\u0442\u0430\u043a, \u043f\u0443\u0441\u0442\u044c \u043a\u043e\u0434 \u0434\u043b\u044f \u0442\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u043d\u0438\u044f \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u0435\u0442 \u0441\u043b\u0435\u0434\u0443\u044e\u0449\u0438\u0439 &quot;\u0448\u0430\u0431\u043b\u043e\u043d&quot; \u0434\u043b\u044f Python:<\/p>\n<p>  <\/p>\n<pre><code class=\"python\">class XService:      def __init__(self):         # \u0418\u043d\u0438\u0446\u0438\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044f      def train(self, ds):         # \u041e\u0431\u0443\u0447\u0435\u043d\u0438\u0435      def predict(self, ds):         # \u041f\u0440\u0435\u0434\u0441\u043a\u0430\u0437\u0430\u043d\u0438\u0435 \u0438 \u0432\u044b\u0432\u043e\u0434 \u0440\u0435\u0437\u0443\u043b\u044c\u0442\u0430\u0442\u043e\u0432 <\/code><\/pre>\n<p>  <\/p>\n<p>\u0414\u043b\u044f Scala \u0448\u0430\u0431\u043b\u043e\u043d \u0432\u044b\u0433\u043b\u044f\u0434\u0438\u0442 \u0441\u043e\u043e\u0442\u0432\u0435\u0442\u0441\u0442\u0432\u0435\u043d\u043d\u043e.<\/p>\n<p>  <\/p>\n<p>\u041f\u043e\u043b\u043d\u044b\u0439 \u043f\u0440\u0438\u043c\u0435\u0440:<\/p>\n<p>  <\/p>\n<pre><code class=\"python\">from sklearn import linear_model  class LocalService:      def __init__(self):         self.model = linear_model.LinearRegression()      def train(self, ds):         X, y = ds         self.model.fit(X, y)      def predict(self, ds):         r = self.model.predict(ds)         print(r) <\/code><\/pre>\n<p>  <\/p>\n<p>\u0422\u0435\u0441\u0442.<\/p>\n<p>  <\/p>\n<p>\u0418\u043c\u043f\u043e\u0440\u0442:<\/p>\n<p>  <\/p>\n<pre><code class=\"python\">import unittest import numpy as np<\/code><\/pre>\n<p>  <\/p>\n<p>\u041e\u0441\u043d\u043e\u0432\u043d\u043e\u0439 \u043a\u043b\u0430\u0441\u0441:<\/p>\n<p>  <\/p>\n<pre><code class=\"python\">class RunTest(unittest.TestCase):<\/code><\/pre>\n<p>  <\/p>\n<p>\u0417\u0430\u043f\u0443\u0441\u043a \u0442\u0435\u0441\u0442\u043e\u0432:<\/p>\n<p>  <\/p>\n<pre><code class=\"python\">if __name__ == &quot;__main__&quot;:     unittest.main()<\/code><\/pre>\n<p>  <\/p>\n<p>\u041f\u043e\u0434\u0433\u043e\u0442\u043e\u0432\u043a\u0430 \u0434\u0430\u043d\u043d\u044b\u0445:<\/p>\n<p>  <\/p>\n<pre><code class=\"python\">X = np.array([     [1, 1],  # 6     [1, 2],  # 8     [2, 2],  # 9     [2, 3]  # 11 ]) y = np.dot(X, np.array([1, 2])) + 3  # [ 6  8  9 11], y = 1 * x_0 + 2 * x_1 + 3<\/code><\/pre>\n<p>  <\/p>\n<p>\u0421\u043e\u0437\u0434\u0430\u043d\u0438\u0435 \u043c\u043e\u0434\u0435\u043b\u0438 \u0438 \u043e\u0431\u0443\u0447\u0435\u043d\u0438\u0435:<\/p>\n<p>  <\/p>\n<pre><code class=\"python\">service = local_service.LocalService() service.train((X, y))<\/code><\/pre>\n<p>  <\/p>\n<p>\u041f\u043e\u043b\u0443\u0447\u0435\u043d\u0438\u0435 \u0440\u0435\u0437\u0443\u043b\u044c\u0442\u0430\u0442\u043e\u0432:<\/p>\n<p>  <\/p>\n<pre><code class=\"python\">service.predict(np.array([[3, 5]])) service.predict(np.array([[4, 6]]))<\/code><\/pre>\n<p>  <\/p>\n<p>\u041e\u0442\u0432\u0435\u0442:<\/p>\n<p>  <\/p>\n<pre><code class=\"plaintext\">[16.] [19.]<\/code><\/pre>\n<p>  <\/p>\n<p>\u0412\u0441\u0435 \u0432\u043c\u0435\u0441\u0442\u0435:<\/p>\n<p>  <\/p>\n<pre><code class=\"python\">import unittest import numpy as np  from spark_streaming_pp import local_service  class RunTest(unittest.TestCase):     def test_run(self):         # Prepare data.         X = np.array([             [1, 1],  # 6             [1, 2],  # 8             [2, 2],  # 9             [2, 3]  # 11         ])         y = np.dot(X, np.array([1, 2])) + 3  # [ 6  8  9 11], y = 1 * x_0 + 2 * x_1 + 3         # Create model and train.         service = local_service.LocalService()         service.train((X, y))         # Predict and results.         service.predict(np.array([[3, 5]]))         service.predict(np.array([[4, 6]]))         # [16.]         # [19.]  if __name__ == &quot;__main__&quot;:     unittest.main()<\/code><\/pre>\n<p>  <\/p>\n<h2 id=\"primer-s-spark-i-python\">\u041f\u0440\u0438\u043c\u0435\u0440 \u0441 Spark \u0438 Python<\/h2>\n<p>  <\/p>\n<p>\u0411\u0443\u0434\u0435\u0442 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u043d \u0430\u043d\u0430\u043b\u043e\u0433\u0438\u0447\u043d\u044b\u0439 \u0430\u043b\u0433\u043e\u0440\u0438\u0442\u043c \u2013 <a href=\"https:\/\/spark.apache.org\/docs\/latest\/ml-classification-regression.html#linear-regression\" rel=\"nofollow\">LinearRegression<\/a>. \u041d\u0443\u0436\u043d\u043e \u043e\u0442\u043c\u0435\u0442\u0438\u0442\u044c, \u0447\u0442\u043e <a href=\"https:\/\/spark.apache.org\/docs\/latest\/structured-streaming-programming-guide.html\" rel=\"nofollow\">Structured Streaming<\/a> \u043e\u0441\u043d\u043e\u0432\u0430\u043d \u043d\u0430 \u0442\u0435\u0445 \u0436\u0435 DataFrame-\u0445, \u043a\u043e\u0442\u043e\u0440\u044b\u0435 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u044e\u0442\u0441\u044f \u0438 \u0432 <a href=\"https:\/\/spark.apache.org\/docs\/latest\/sql-programming-guide.html\" rel=\"nofollow\">Spark Sql<\/a>. \u041d\u043e \u043a\u0430\u043a \u043e\u0431\u044b\u0447\u043d\u043e \u0435\u0441\u0442\u044c \u043d\u044e\u0430\u043d\u0441\u044b.<\/p>\n<p>  <\/p>\n<p>\u0418\u043d\u0438\u0446\u0438\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u044f:<\/p>\n<p>  <\/p>\n<pre><code class=\"python\">self.service = LinearRegression(maxIter=10, regParam=0.01) self.model = None<\/code><\/pre>\n<p>  <\/p>\n<p>\u041e\u0431\u0443\u0447\u0435\u043d\u0438\u0435:<\/p>\n<p>  <\/p>\n<pre><code class=\"python\">self.model = self.service.fit(ds)<\/code><\/pre>\n<p>  <\/p>\n<p>\u041f\u043e\u043b\u0443\u0447\u0435\u043d\u0438\u0435 \u0440\u0435\u0437\u0443\u043b\u044c\u0442\u0430\u0442\u043e\u0432:<\/p>\n<p>  <\/p>\n<pre><code class=\"python\">transformed_ds = self.model.transform(ds) q = transformed_ds.select(&quot;label&quot;, &quot;prediction&quot;).writeStream.format(&quot;console&quot;).start() return q<\/code><\/pre>\n<p>  <\/p>\n<p>\u0412\u0441\u0435 \u0432\u043c\u0435\u0441\u0442\u0435:<\/p>\n<p>  <\/p>\n<pre><code class=\"python\">from pyspark.ml.regression import LinearRegression  class StructuredStreamingService:     def __init__(self):         self.service = LinearRegression(maxIter=10, regParam=0.01)         self.model = None      def train(self, ds):         self.model = self.service.fit(ds)      def predict(self, ds):         transformed_ds = self.model.transform(ds)         q = transformed_ds.select(&quot;label&quot;, &quot;prediction&quot;).writeStream.format(&quot;console&quot;).start()         return q <\/code><\/pre>\n<p>  <\/p>\n<p>\u0421\u0430\u043c \u0442\u0435\u0441\u0442.<\/p>\n<p>  <\/p>\n<p>\u041e\u0431\u044b\u0447\u043d\u043e \u0432 \u0442\u0435\u0441\u0442\u0430\u0445 \u043c\u043e\u0436\u043d\u043e \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c \u0434\u0430\u043d\u043d\u044b\u0435, \u043a\u043e\u0442\u043e\u0440\u044b\u0435 \u0441\u043e\u0437\u0434\u0430\u044e\u0442\u0441\u044f \u043f\u0440\u044f\u043c\u043e \u0432 \u0442\u0435\u0441\u0442\u0430\u0445.<\/p>\n<p>  <\/p>\n<pre><code class=\"python\">train_ds = spark.createDataFrame([     (6.0, Vectors.dense([1.0, 1.0])),     (8.0, Vectors.dense([1.0, 2.0])),     (9.0, Vectors.dense([2.0, 2.0])),     (11.0, Vectors.dense([2.0, 3.0])) ],     [&quot;label&quot;, &quot;features&quot;] )<\/code><\/pre>\n<p>  <\/p>\n<p>\u042d\u0442\u043e \u043e\u0447\u0435\u043d\u044c \u0443\u0434\u043e\u0431\u043d\u043e \u0438 \u043a\u043e\u0434 \u043f\u043e\u043b\u0443\u0447\u0430\u0435\u0442\u0441\u044f \u043a\u043e\u043c\u043f\u0430\u043a\u0442\u043d\u044b\u043c.<\/p>\n<p>  <\/p>\n<p>\u041d\u043e \u043f\u043e\u0434\u043e\u0431\u043d\u044b\u0439 \u043a\u043e\u0434, \u043a \u0441\u043e\u0436\u0430\u043b\u0435\u043d\u0438\u044e, \u043d\u0435 \u0431\u0443\u0434\u0435\u0442 \u0440\u0430\u0431\u043e\u0442\u0430\u0442\u044c \u0432 Structured Streaming, \u0442.\u043a. \u0441\u043e\u0437\u0434\u0430\u043d\u043d\u044b\u0439 DataFrame \u043d\u0435 \u0431\u0443\u0434\u0435\u0442 \u043e\u0431\u043b\u0430\u0434\u0430\u0442\u044c \u043d\u0443\u0436\u043d\u044b\u043c\u0438 \u0441\u0432\u043e\u0439\u0441\u0442\u0432\u0430\u043c\u0438, \u0445\u043e\u0442\u044f \u0438 \u0431\u0443\u0434\u0435\u0442 \u0441\u043e\u043e\u0442\u0432\u0435\u0442\u0441\u0442\u0432\u043e\u0432\u0430\u0442\u044c \u043a\u043e\u043d\u0442\u0440\u0430\u043a\u0442\u0443 DataFrame.<br \/>  \u041d\u0430 \u0442\u0435\u043a\u0443\u0449\u0438\u0439 \u043c\u043e\u043c\u0435\u043d\u0442 \u0434\u043b\u044f \u0441\u043e\u0437\u0434\u0430\u043d\u0438\u044f \u0438\u0441\u0442\u043e\u0447\u043d\u0438\u043a\u043e\u0432 \u0434\u043b\u044f \u0442\u0435\u0441\u0442\u043e\u0432 \u043c\u043e\u0436\u043d\u043e \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c \u0442\u0430\u043a\u043e\u0439 \u0436\u0435 \u043f\u043e\u0434\u0445\u043e\u0434, \u0447\u0442\u043e \u0438 \u0432 <a href=\"https:\/\/github.com\/apache\/spark\/blob\/master\/python\/pyspark\/sql\/tests\/test_streaming.py\" rel=\"nofollow\">\u0442\u0435\u0441\u0442\u0430\u0445 \u0434\u043b\u044f Spark<\/a>.<\/p>\n<p>  <\/p>\n<pre><code class=\"python\">def test_stream_read_options_overwrite(self):     bad_schema = StructType([StructField(&quot;test&quot;, IntegerType(), False)])     schema = StructType([StructField(&quot;data&quot;, StringType(), False)])     df = self.spark.readStream.format('csv').option('path', 'python\/test_support\/sql\/fake') \\         .schema(bad_schema)\\         .load(path='python\/test_support\/sql\/streaming', schema=schema, format='text')     self.assertTrue(df.isStreaming)     self.assertEqual(df.schema.simpleString(), &quot;struct&lt;data:string&gt;&quot;)<\/code><\/pre>\n<p>  <\/p>\n<p>\u0418 \u0442\u0430\u043a.<\/p>\n<p>  <\/p>\n<p>\u0421\u043e\u0437\u0434\u0430\u0435\u0442\u0441\u044f \u043a\u043e\u043d\u0442\u0435\u043a\u0441\u0442 \u0434\u043b\u044f \u0440\u0430\u0431\u043e\u0442\u044b:<\/p>\n<p>  <\/p>\n<pre><code class=\"python\">spark = SparkSession.builder.enableHiveSupport().getOrCreate() spark.sparkContext.setLogLevel(&quot;ERROR&quot;)<\/code><\/pre>\n<p>  <\/p>\n<p>\u041f\u043e\u0434\u0433\u043e\u0442\u043e\u0432\u043a\u0430 \u0434\u0430\u043d\u043d\u044b\u0445 \u0434\u043b\u044f \u043e\u0431\u0443\u0447\u0435\u043d\u0438\u044f (\u043c\u043e\u0436\u043d\u043e \u0441\u0434\u0435\u043b\u0430\u0442\u044c \u043e\u0431\u044b\u0447\u043d\u044b\u043c \u0441\u043f\u043e\u0441\u043e\u0431\u043e\u043c):<\/p>\n<p>  <\/p>\n<pre><code class=\"python\">train_ds = spark.createDataFrame([     (6.0, Vectors.dense([1.0, 1.0])),     (8.0, Vectors.dense([1.0, 2.0])),     (9.0, Vectors.dense([2.0, 2.0])),     (11.0, Vectors.dense([2.0, 3.0])) ],     [&quot;label&quot;, &quot;features&quot;] )<\/code><\/pre>\n<p>  <\/p>\n<p>\u041e\u0431\u0443\u0447\u0435\u043d\u0438\u0435:<\/p>\n<p>  <\/p>\n<pre><code class=\"python\">service = structure_streaming_service.StructuredStreamingService() service.train(train_ds)<\/code><\/pre>\n<p>  <\/p>\n<p>\u041f\u043e\u043b\u0443\u0447\u0435\u043d\u0438\u0435 \u0440\u0435\u0437\u0443\u043b\u044c\u0442\u0430\u0442\u043e\u0432. \u0414\u043b\u044f \u043d\u0430\u0447\u0430\u043b\u0430 \u0441\u0447\u0438\u0442\u044b\u0432\u0430\u0435\u043c \u0434\u0430\u043d\u043d\u044b\u0435 \u0438\u0437 \u0444\u0430\u0439\u043b\u0430 \u0438 \u0432\u044b\u0434\u0435\u043b\u044f\u0435\u043c: \u043f\u0440\u0438\u0437\u043d\u0430\u043a\u0438 \u0438 \u0438\u0434\u0435\u043d\u0442\u0438\u0444\u0438\u043a\u0430\u0442\u043e\u0440 \u0434\u043b\u044f \u043e\u0431\u044a\u0435\u043a\u0442\u043e\u0432. \u041f\u043e\u0441\u043b\u0435 \u0437\u0430\u043f\u0443\u0441\u043a\u0430\u0435\u043c \u043f\u0440\u0435\u0434\u0441\u043a\u0430\u0437\u0430\u043d\u0438\u0435 \u0441 \u043e\u0436\u0438\u0434\u0430\u043d\u0438\u0435\u043c \u0432 3 \u0441\u0435\u043a\u0443\u043d\u0434\u044b.<\/p>\n<p>  <\/p>\n<pre><code class=\"python\">def extract_features(x):     values = x.split(&quot;,&quot;)     features_ = []     for i in values[1:]:         features_.append(float(i))     features = Vectors.dense(features_)     return features  extract_features_udf = udf(extract_features, VectorUDT())  def extract_label(x):     values = x.split(&quot;,&quot;)     label = float(values[0])     return label  extract_label_udf = udf(extract_label, FloatType())  predict_ds = spark.readStream.format(&quot;text&quot;).option(&quot;path&quot;, &quot;data\/structured_streaming&quot;).load() \\     .withColumn(&quot;features&quot;, extract_features_udf(col(&quot;value&quot;))) \\     .withColumn(&quot;label&quot;, extract_label_udf(col(&quot;value&quot;)))  service.predict(predict_ds).awaitTermination(3)<\/code><\/pre>\n<p>  <\/p>\n<p>\u041e\u0442\u0432\u0435\u0442:<\/p>\n<p>  <\/p>\n<pre><code class=\"plaintext\">15.96699 18.96138<\/code><\/pre>\n<p>  <\/p>\n<p>\u0412\u0441\u0435 \u0432\u043c\u0435\u0441\u0442\u0435:<\/p>\n<p>  <\/p>\n<pre><code class=\"python\">import unittest import warnings  from pyspark.sql import SparkSession from pyspark.sql.functions import col, udf from pyspark.sql.types import FloatType from pyspark.ml.linalg import Vectors, VectorUDT  from spark_streaming_pp import structure_streaming_service  class RunTest(unittest.TestCase):     def test_run(self):         spark = SparkSession.builder.enableHiveSupport().getOrCreate()         spark.sparkContext.setLogLevel(&quot;ERROR&quot;)         # Prepare data.         train_ds = spark.createDataFrame([             (6.0, Vectors.dense([1.0, 1.0])),             (8.0, Vectors.dense([1.0, 2.0])),             (9.0, Vectors.dense([2.0, 2.0])),             (11.0, Vectors.dense([2.0, 3.0]))         ],             [&quot;label&quot;, &quot;features&quot;]         )         # Create model and train.         service = structure_streaming_service.StructuredStreamingService()         service.train(train_ds)          # Predict and results.          def extract_features(x):             values = x.split(&quot;,&quot;)             features_ = []             for i in values[1:]:                 features_.append(float(i))             features = Vectors.dense(features_)             return features          extract_features_udf = udf(extract_features, VectorUDT())          def extract_label(x):             values = x.split(&quot;,&quot;)             label = float(values[0])             return label          extract_label_udf = udf(extract_label, FloatType())          predict_ds = spark.readStream.format(&quot;text&quot;).option(&quot;path&quot;, &quot;data\/structured_streaming&quot;).load() \\             .withColumn(&quot;features&quot;, extract_features_udf(col(&quot;value&quot;))) \\             .withColumn(&quot;label&quot;, extract_label_udf(col(&quot;value&quot;)))          service.predict(predict_ds).awaitTermination(3)          # +-----+------------------+         # |label|        prediction|         # +-----+------------------+         # |  1.0|15.966990887541273|         # |  2.0|18.961384020443553|         # +-----+------------------+      def setUp(self):         warnings.filterwarnings(&quot;ignore&quot;, category=ResourceWarning)         warnings.filterwarnings(&quot;ignore&quot;, category=DeprecationWarning)  if __name__ == &quot;__main__&quot;:     unittest.main()<\/code><\/pre>\n<p>  <\/p>\n<p>\u041d\u0443\u0436\u043d\u043e \u043e\u0442\u043c\u0435\u0442\u0438\u0442\u044c, \u0447\u0442\u043e \u0434\u043b\u044f Scala \u043c\u043e\u0436\u043d\u043e \u0432\u043e\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c\u0441\u044f \u0441\u043e\u0437\u0434\u0430\u043d\u0438\u0435\u043c \u043f\u043e\u0442\u043e\u043a\u0430 \u0432 \u043f\u0430\u043c\u044f\u0442\u0438.<br \/>  \u042d\u0442\u043e \u043c\u043e\u0436\u0435\u0442 \u0432\u044b\u0433\u043b\u044f\u0434\u0435\u0442\u044c \u0432\u043e\u0442 \u0442\u0430\u043a:<\/p>\n<p>  <\/p>\n<pre><code class=\"scala\">implicit val sqlCtx = spark.sqlContext import spark.implicits._ val source = MemoryStream[Record] source.addData(Record(1.0, Vectors.dense(3.0, 5.0))) source.addData(Record(2.0, Vectors.dense(4.0, 6.0))) val predictDs = source.toDF() service.predict(predictDs).awaitTermination(2000)<\/code><\/pre>\n<p>  <\/p>\n<p>\u041f\u043e\u043b\u043d\u044b\u0439 \u043f\u0440\u0438\u043c\u0435\u0440 \u043d\u0430 Scala (\u0437\u0434\u0435\u0441\u044c, \u0434\u043b\u044f \u0440\u0430\u0437\u043d\u043e\u043e\u0431\u0440\u0430\u0437\u0438\u044f, \u043d\u0435 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u0435\u0442\u0441\u044f sql):<\/p>\n<p>  <\/p>\n<pre><code class=\"scala\">package aaa.abc.dd.spark_streaming_pr.cluster  import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions.udf import org.apache.spark.sql.streaming.StreamingQuery  class StructuredStreamingService {   var service: LinearRegression = _   var model: LinearRegressionModel = _    def train(ds: DataFrame): Unit = {     service = new LinearRegression().setMaxIter(10).setRegParam(0.01)     model = service.fit(ds)   }    def predict(ds: DataFrame): StreamingQuery = {      val m = ds.sparkSession.sparkContext.broadcast(model)      def transformFun(features: org.apache.spark.ml.linalg.Vector): Double = {       m.value.predict(features)     }      val transform: org.apache.spark.ml.linalg.Vector =&gt; Double = transformFun      val toUpperUdf = udf(transform)      val predictionDs = ds.withColumn(&quot;prediction&quot;, toUpperUdf(ds(&quot;features&quot;)))      predictionDs       .writeStream       .foreachBatch((r: DataFrame, i: Long) =&gt; {         r.show()         \/\/ scalastyle:off println         println(s&quot;$i&quot;)         \/\/ scalastyle:on println       })       .start()   } }<\/code><\/pre>\n<p>  <\/p>\n<p>\u0422\u0435\u0441\u0442:<\/p>\n<p>  <\/p>\n<pre><code class=\"scala\">package aaa.abc.dd.spark_streaming_pr.cluster  import org.apache.spark.ml.linalg.Vectors import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.MemoryStream import org.scalatest.{Matchers, Outcome, fixture}  class StructuredStreamingServiceSuite extends fixture.FunSuite with Matchers {   test(&quot;run&quot;) { spark =&gt;     \/\/ Prepare data.     val trainDs = spark.createDataFrame(Seq(       (6.0, Vectors.dense(1.0, 1.0)),       (8.0, Vectors.dense(1.0, 2.0)),       (9.0, Vectors.dense(2.0, 2.0)),       (11.0, Vectors.dense(2.0, 3.0))     )).toDF(&quot;label&quot;, &quot;features&quot;)     \/\/ Create model and train.     val service = new StructuredStreamingService()     service.train(trainDs)     \/\/ Predict and results.     implicit val sqlCtx = spark.sqlContext     import spark.implicits._     val source = MemoryStream[Record]     source.addData(Record(1.0, Vectors.dense(3.0, 5.0)))     source.addData(Record(2.0, Vectors.dense(4.0, 6.0)))     val predictDs = source.toDF()     service.predict(predictDs).awaitTermination(2000)     \/\/ +-----+---------+------------------+     \/\/ |label| features|        prediction|     \/\/ +-----+---------+------------------+     \/\/ |  1.0|[3.0,5.0]|15.966990887541273|     \/\/ |  2.0|[4.0,6.0]|18.961384020443553|     \/\/ +-----+---------+------------------+   }    override protected def withFixture(test: OneArgTest): Outcome = {     val spark = SparkSession.builder().master(&quot;local[2]&quot;).getOrCreate()      try withFixture(test.toNoArgTest(spark))      finally spark.stop()   }    override type FixtureParam = SparkSession    case class Record(label: Double, features: org.apache.spark.ml.linalg.Vector)  }<\/code><\/pre>\n<p>  <\/p>\n<h1 id=\"vyvody\">\u0412\u044b\u0432\u043e\u0434\u044b<\/h1>\n<p>  <\/p>\n<p>\u041f\u0440\u0438 \u043d\u0430\u043f\u0438\u0441\u0430\u043d\u0438\u0438 \u0442\u0435\u0441\u0442\u043e\u0432 \u043d\u0435\u043e\u0431\u0445\u043e\u0434\u0438\u043c\u043e \u0440\u0430\u0437\u0434\u0435\u043b\u044f\u0442\u044c \u043a\u043e\u0434 \u0442\u0430\u043a\u0438\u043c \u043e\u0431\u0440\u0430\u0437\u043e\u043c, \u0447\u0442\u043e\u0431\u044b \u0440\u0430\u0437\u0434\u0435\u043b\u044f\u0442\u044c \u043b\u043e\u0433\u0438\u043a\u0443 \u0438 \u043f\u0440\u0438\u043c\u0435\u043d\u0435\u043d\u0438\u0435 \u043a\u043e\u043d\u043a\u0440\u0435\u0442\u043d\u044b\u0445 \u0432\u044b\u0437\u043e\u0432\u043e\u0432 API. \u041c\u043e\u0436\u043d\u043e \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u0442\u044c\u0441\u044f \u043b\u044e\u0431\u044b\u0435 \u0434\u043e\u0441\u0442\u0443\u043f\u043d\u044b\u0435 \u0438\u0441\u0442\u043e\u0447\u043d\u0438\u043a\u0438. \u0412 \u0442\u043e\u043c \u0447\u0438\u0441\u043b\u0435 \u0438 <a href=\"https:\/\/spark.apache.org\/docs\/latest\/structured-streaming-kafka-integration.html\" rel=\"nofollow\">kafka<\/a>. <\/p>\n<p>  <\/p>\n<p>\u0422\u0430\u043a\u0438\u0435 \u0430\u0431\u0441\u0442\u0440\u0430\u043a\u0446\u0438\u0438 \u043a\u0430\u043a \u201cDataFrame\u201d \u043f\u043e\u0437\u0432\u043e\u043b\u044f\u044e\u0442 \u044d\u0442\u043e \u0441\u0434\u0435\u043b\u0430\u0442\u044c \u043b\u0435\u0433\u043a\u043e \u0438 \u043f\u0440\u043e\u0441\u0442\u043e. <\/p>\n<p>  <\/p>\n<p>\u041f\u0440\u0438 \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u043d\u0438\u0438 Python \u0434\u0430\u043d\u043d\u044b\u0435 \u043f\u0440\u0438\u0434\u0435\u0442\u0441\u044f \u0445\u0440\u0430\u043d\u0438\u0442\u044c \u0432 \u0444\u0430\u0439\u043b\u0430\u0445.<\/p>\n<p>  <\/p>\n<h1 id=\"ssylki-i-resursy\">\u0421\u0441\u044b\u043b\u043a\u0438 \u0438 \u0440\u0435\u0441\u0443\u0440\u0441\u044b<\/h1>\n<p>  <\/p>\n<ul>\n<li><a href=\"https:\/\/github.com\/kartzum\/intro_to_apache_kafka\/tree\/main\/story\/resources\/spark_streaming_pp\" rel=\"nofollow\">\u0418\u0441\u0445\u043e\u0434\u043d\u044b\u0439 \u043a\u043e\u0434 \u2014 Python<\/a><\/li>\n<li><a href=\"https:\/\/github.com\/kartzum\/intro_to_apache_kafka\/tree\/main\/story\/resources\/spark-streaming-pr\" rel=\"nofollow\">\u0418\u0441\u0445\u043e\u0434\u043d\u044b\u0439 \u043a\u043e\u0434 \u2014 Scala<\/a><\/li>\n<\/ul>\n<\/div>\n<p> \u0441\u0441\u044b\u043b\u043a\u0430 \u043d\u0430 \u043e\u0440\u0438\u0433\u0438\u043d\u0430\u043b \u0441\u0442\u0430\u0442\u044c\u0438 <a href=\"https:\/\/habr.com\/ru\/post\/535938\/\"> https:\/\/habr.com\/ru\/post\/535938\/<\/a><\/p>\n","protected":false},"excerpt":{"rendered":"\n<div class=\"post__text post__text-html post__text_v1\" id=\"post-content-body\">\n<h1 id=\"vvedenie\">\u0412\u0432\u0435\u0434\u0435\u043d\u0438\u0435<\/h1>\n<p>  <\/p>\n<p>\u041d\u0430 \u0442\u0435\u043a\u0443\u0449\u0438\u0439 \u043c\u043e\u043c\u0435\u043d\u0442 \u043d\u0435 \u0442\u0430\u043a \u043c\u043d\u043e\u0433\u043e \u043f\u0440\u0438\u043c\u0435\u0440\u043e\u0432 \u0442\u0435\u0441\u0442\u043e\u0432 \u0434\u043b\u044f \u043f\u0440\u0438\u043b\u043e\u0436\u0435\u043d\u0438\u0439 \u043d\u0430 \u043e\u0441\u043d\u043e\u0432\u0435 Spark Structured Streaming. \u041f\u043e\u044d\u0442\u043e\u043c\u0443 \u0432 \u0434\u0430\u043d\u043d\u043e\u0439 \u0441\u0442\u0430\u0442\u044c\u0435 \u043f\u0440\u0438\u0432\u043e\u0434\u044f\u0442\u0441\u044f \u0431\u0430\u0437\u043e\u0432\u044b\u0435 \u043f\u0440\u0438\u043c\u0435\u0440\u044b \u0442\u0435\u0441\u0442\u043e\u0432 \u0441 \u043f\u043e\u0434\u0440\u043e\u0431\u043d\u044b\u043c \u043e\u043f\u0438\u0441\u0430\u043d\u0438\u0435\u043c. <\/p>\n<p>  <\/p>\n<p>\u0412\u0441\u0435 \u043f\u0440\u0438\u043c\u0435\u0440\u044b \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u044e\u0442: Apache Spark 3.0.1.<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[],"tags":[],"class_list":["post-315906","post","type-post","status-publish","format-standard","hentry"],"_links":{"self":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/posts\/315906","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=315906"}],"version-history":[{"count":0,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/posts\/315906\/revisions"}],"wp:attachment":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=315906"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=315906"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=315906"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}