Spark 2.1.0 入门:构建一个机器学习工作流

大数据学习路线图

【版权声明】博客内容由厦门大学数据库实验室拥有版权,未经允许,请勿转载!
[返回Spark教程首页]

工作流(ML Pipelines)例子

本节以逻辑斯蒂回归为例,构建一个典型的机器学习过程,来具体介绍一下工作流是如何应用的。我们的目的是查找出所有包含"spark"的句子,即将包含"spark"的句子的标签设为1,没有"spark"的句子的标签设为0。Spark2.0起,SQLContextHiveContext已经不再推荐使用,改以SparkSession代之,故本文中不再使用SQLContext来进行相关的操作,关于SparkSession的具体详情,这里不再赘述,可以参看Spark2.0的官方文档

Spark2.0以上版本的spark-shell在启动时会自动创建一个名为sparkSparkSession对象,当需要手工创建时,SparkSession可以由其伴生对象的builder()方法创建出来,如下代码段所示:

  1. import org.apache.spark.sql.SparkSession
  2. val spark = SparkSession.builder().
  3. master("local").
  4. appName("my App Name").
  5. getOrCreate()
scala

SQLContext一样,也可以开启RDD的隐式转换:

  1. import spark.implicits._
  2.  
scala

下文中,我们默认名为sparkSparkSession已经创建。

然后,我们引入要包含的包并构建训练数据集。

  1. import org.apache.spark.ml.feature._
  2. import org.apache.spark.ml.classification.LogisticRegression
  3. import org.apache.spark.ml.{Pipeline,PipelineModel}
  4. import org.apache.spark.ml.linalg.Vector
  5. import org.apache.spark.sql.Row
  6.  
  7.  
  8. scala> val training = spark.createDataFrame(Seq(
  9. | (0L, "a b c d e spark", 1.0),
  10. | (1L, "b d", 0.0),
  11. | (2L, "spark f g h", 1.0),
  12. | (3L, "hadoop mapreduce", 0.0)
  13. | )).toDF("id", "text", "label")
  14. training: org.apache.spark.sql.DataFrame = [id: bigint, text: string, label: double]
scala

在这一步中我们要定义 Pipeline 中的各个工作流阶段PipelineStage,包括转换器和评估器,具体的,包含tokenizer, hashingTF和lr三个步骤。

  1. scala> val tokenizer = new Tokenizer().
  2. | setInputCol("text").
  3. | setOutputCol("words")
  4. tokenizer: org.apache.spark.ml.feature.Tokenizer = tok_5151ed4fa43e
  5.  
  6. scala> val hashingTF = new HashingTF().
  7. | setNumFeatures(1000).
  8. | setInputCol(tokenizer.getOutputCol).
  9. | setOutputCol("features")
  10. hashingTF: org.apache.spark.ml.feature.HashingTF = hashingTF_332f74b21ecb
  11.  
  12. scala> val lr = new LogisticRegression().
  13. | setMaxIter(10).
  14. | setRegParam(0.01)
  15. lr: org.apache.spark.ml.classification.LogisticRegression = logreg_28a670ae952f
scala

有了这些处理特定问题的转换器和评估器,接下来就可以按照具体的处理逻辑有序的组织PipelineStages 并创建一个Pipeline。

  1. scala> val pipeline = new Pipeline().
  2. | setStages(Array(tokenizer, hashingTF, lr))
  3. pipeline: org.apache.spark.ml.Pipeline = pipeline_4dabd24db001
scala

现在构建的Pipeline本质上是一个Estimator,在它的fit()方法运行之后,它将产生一个PipelineModel,它是一个Transformer。

  1. scala> val model = pipeline.fit(training)
  2. model: org.apache.spark.ml.PipelineModel = pipeline_4dabd24db001
scala

我们可以看到,model的类型是一个PipelineModel,这个管道模型将在测试数据的时候使用。所以接下来,我们先构建测试数据。

  1. scala> val test = spark.createDataFrame(Seq(
  2. | (4L, "spark i j k"),
  3. | (5L, "l m n"),
  4. | (6L, "spark a"),
  5. | (7L, "apache hadoop")
  6. | )).toDF("id", "text")
  7. test: org.apache.spark.sql.DataFrame = [id: bigint, text: string]
scala

然后,我们调用我们训练好的PipelineModel的transform()方法,让测试数据按顺序通过拟合的工作流,生成我们所需要的预测结果。

  1. scala> model.transform(test).
  2. | select("id", "text", "probability", "prediction").
  3. | collect().
  4. | foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
  5. | println(s"($id, $text) --> prob=$prob, prediction=$prediction")
  6. | }
  7. (4, spark i j k) --> prob=[0.5406433544851421,0.45935664551485783], prediction=0.0
  8. (5, l m n) --> prob=[0.9334382627383259,0.06656173726167405], prediction=0.0
  9. (6, spark a) --> prob=[0.15041430048068286,0.8495856995193171], prediction=1.0
  10. (7, apache hadoop) --> prob=[0.9768636139518304,0.023136386048169585], prediction=0.0
scala

通过上述结果,我们可以看到,第4句和第6句中都包含"spark",其中第六句的预测是1,与我们希望的相符;而第4句虽然预测的依然是0,但是通过概率我们可以看到,第4句有46%的概率预测是1,而第5句、第7句分别只有7%和2%的概率预测为1,这是由于训练数据集较少,如果有更多的测试数据进行学习,预测的准确率将会有显著提升。

子雨大数据之Spark入门
扫一扫访问本博客