Spark入门:决策树分类器

大数据技术原理与应用

【版权声明】博客内容由厦门大学数据库实验室拥有版权,未经允许,请勿转载!
[返回Spark教程首页]

一、方法简介

​ 决策树(decision tree)是一种基本的分类与回归方法,这里主要介绍用于分类的决策树。决策树模式呈树形结构,其中每个内部节点表示一个属性上的测试,每个分支代表一个测试输出,每个叶节点代表一种类别。学习时利用训练数据,根据损失函数最小化的原则建立决策树模型;预测时,对新的数据,利用决策树模型进行分类。

二、基本原理

​ 决策树学习通常包括3个步骤:特征选择、决策树的生成和决策树的剪枝。

(一)特征选择

​ 特征选择在于选取对训练数据具有分类能力的特征,这样可以提高决策树学习的效率。通常特征选择的准则是信息增益(或信息增益比、基尼指数等),每次计算每个特征的信息增益,并比较它们的大小,选择信息增益最大(信息增益比最大、基尼指数最小)的特征。下面我们重点介绍一下特征选择的准则:信息增益。

​ 首先定义信息论中广泛使用的一个度量标准——熵(entropy),它是表示随机变量不确定性的度量。熵越大,随机变量的不确定性就越大。而信息增益(informational entropy)表示得知某一特征后使得信息的不确定性减少的程度。简单的说,一个属性的信息增益就是由于使用这个属性分割样例而导致的期望熵降低。信息增益、信息增益比和基尼指数的具体定义如下:

信息增益:特征A对训练数据集D的信息增益 \(g(D,A)\) ,定义为集合D的经验熵 \(H(D)\) 与特征A给定条件下D的经验条件熵 \(H(D|A)\) 之差,即
\(g(D,A)=H(D)-H(D|A)\)
信息增益比:特征A对训练数据集D的信息增益比 \(g_R(D,A)\) 定义为其信息增益 \(g(D,A)\) 与训练数据集D关于特征A的值的熵 \(H_A(D)\) 之比,即

\(g_R(D,A)=\frac{g(D,A)}{H_A(D)}\)
其中, \(H_A(D)=-\sum_{i=1}^{n}\frac{\left|D_i\right|}{\left|D\right|}log_2\frac{\left|D_i\right|}{\left|D\right|}\) ,n是特征A取值的个数。

基尼指数:分类问题中,假设有K个类,样本点属于第K类的概率为 \(p_k\) ,则概率分布的基尼指数定义为
\(Gini(p)=\sum_{k=1}^{K}p_k(1-p_k)=1-\sum_{k=1}^{K}p_k^2\)

(二)决策树的生成

​ 从根结点开始,对结点计算所有可能的特征的信息增益,选择信息增益最大的特征作为结点的特征,由该特征的不同取值建立子结点,再对子结点递归地调用以上方法,构建决策树;直到所有特征的信息增均很小或没有特征可以选择为止,最后得到一个决策树。

​ 决策树需要有停止条件来终止其生长的过程。一般来说最低的条件是:当该节点下面的所有记录都属于同一类,或者当所有的记录属性都具有相同的值时。这两种条件是停止决策树的必要条件,也是最低的条件。在实际运用中一般希望决策树提前停止生长,限定叶节点包含的最低数据量,以防止由于过度生长造成的过拟合问题。

(三)决策树的剪枝

​ 决策树生成算法递归地产生决策树,直到不能继续下去为止。这样产生的树往往对训练数据的分类很准确,但对未知的测试数据的分类却没有那么准确,即出现过拟合现象。解决这个问题的办法是考虑决策树的复杂度,对已生成的决策树进行简化,这个过程称为剪枝。

​ 决策树的剪枝往往通过极小化决策树整体的损失函数来实现。一般来说,损失函数可以进行如下的定义:
\(C_a(T)=C(T)+a\left|T\right|\)
​ 其中,T为任意子树, \(C(T)\) 为对训练数据的预测误差(如基尼指数), \(\left|T\right|\) 为子树的叶结点个数, \(a\ge0\) 为参数, \(C_a(T)\) 为参数是 \(a\) 时的子树T的整体损失,参数 \(a\) 权衡训练数据的拟合程度与模型的复杂度。对于固定的 \(a\) ,一定存在使损失函数 \(C_a(T)\) 最小的子树,将其表示为 \(T_a\) 。当 \(a\) 大的时候,最优子树 \(T_a\) 偏小;当 \(a\) 小的时候,最优子树 \(T_a\) 偏大。

示例代码

​ 以iris数据集(https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data)为例进行分析。iris以鸢尾花的特征作为数据来源,数据集包含150个数据集,分为3类,每类50个数据,每个数据包含4个属性,是在数据挖掘、数据分类中非常常用的测试集、训练集。为了便于理解,我们这里主要用后两个属性(花瓣的长度和宽度)来进行分类。

1. 导入需要的包:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.ml.{Pipeline,PipelineModel}
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer,HashingTF, Tokenizer}
import org.apache.spark.mllib.linalg.{Vector,Vectors}
import org.apache.spark.sql.Row
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
2. 读取数据,简要分析:

​ 首先根据SparkContext来创建一个SQLContext,其中sc是一个已经存在的SparkContext;然后导入sqlContext.implicits._来实现RDD到Dataframe的隐式转换。

scala> val sqlContext = new SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@10d83860
scala> import sqlContext.implicits._
import sqlContext.implicits._

​ 读取文本文件,第一个map把每行的数据用“,”隔开,每行被分成了5部分,前4部分是鸢尾花的4个特征,最后一部分是鸢尾花的分类。前面说到,这里主要用后两个属性(花瓣的长度和宽度)来进行分类,所以在下一个map中我们获取到这两个属性,存储在Vector中。

scala> val observations=sc.textFile("G:/spark/iris.data").map(_.split(",")).map(p => Vectors.dense(p(2).toDouble, p(3).toDouble))
observations: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[14] at map at <console>:37

​ 这里用case class定义一个schema:Iris,Iris就是需要的数据的结构;然后读取数据,创建一个Iris模式的RDD,然后转化成dataframe;最后调用show()方法来查看一下部分数据。

scala> case class Iris(features: Vector, label: String)
defined class Iris
scala> val data = sc.textFile("G:/spark/iris.data")
        |       .map(_.split(","))
        |       .map(p => Iris(Vectors.dense(p(2).toDouble, p(3).toDouble), p(4).toString()))
        |       .toDF()
data: org.apache.spark.sql.DataFrame = [features: vector, label: string]
scala> data.show()
+---------+-----------+
| features|      label|
+---------+-----------+
|[1.4,0.2]|Iris-setosa|
|[1.4,0.2]|Iris-setosa|
|[1.3,0.2]|Iris-setosa|
|[1.5,0.2]|Iris-setosa|
|[1.4,0.2]|Iris-setosa|
|[1.7,0.4]|Iris-setosa|
|[1.4,0.3]|Iris-setosa|
|[1.5,0.2]|Iris-setosa|
|[1.4,0.2]|Iris-setosa|
|[1.5,0.1]|Iris-setosa|
|[1.5,0.2]|Iris-setosa|
|[1.6,0.2]|Iris-setosa|
|[1.4,0.1]|Iris-setosa|
|[1.1,0.1]|Iris-setosa|
|[1.2,0.2]|Iris-setosa|
|[1.5,0.4]|Iris-setosa|
|[1.3,0.4]|Iris-setosa|
|[1.4,0.3]|Iris-setosa|
|[1.7,0.3]|Iris-setosa|
|[1.5,0.3]|Iris-setosa|
+---------+-----------+
only showing top 20 rows
3. 构建ML的pipeline

​ 分别获取标签列和特征列,进行索引,并进行了重命名。

scala> val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(data)
labelIndexer: org.apache.spark.ml.feature.StringIndexerModel = strIdx_6033e13b0b2b
scala> val featureIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").fit(data)
featureIndexer: org.apache.spark.ml.feature.VectorIndexerModel = vecIdx_b5a8adea6903

​ 接下来,把数据集随机分成训练集和测试集,其中训练集占70%。

scala> val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
trainingData: org.apache.spark.sql.DataFrame = [features: vector, label: string]

testData: org.apache.spark.sql.DataFrame = [features: vector, label: string]

​ 然后,设置决策树的参数。这里统一用setter的方法来设置,也可以用ParamMap来设置(具体的可以查看spark mllib的官网)。这里设置了用gini指数来进行特征选择,设置树的最大深度为5,具体的可以设置的参数可以通过explainParams()来获取,还能看已经设置的参数的结果。

scala> val dt = new DecisionTreeClassifier()
  |         .setLabelCol("indexedLabel")
  |         .setFeaturesCol("indexedFeatures")
  |         .setImpurity("gini")
  |         .setMaxDepth(5)
dt: org.apache.spark.ml.classification.DecisionTreeClassifier = dtc_16842f2bb6a7
scala> println("DecisionTreeClassifier parameters:\n" + dt.explainParams() + "\n")
DecisionTreeClassifier parameters:
cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. (default: false)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations (default:10)
featuresCol: features column name (default: features, current: indexedFeatures)
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini (default: gini, current: gini)
labelCol: label column name (default: label, current: indexedLabel)
maxBins: Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature. (default: 32)
maxDepth: Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. (default: 5, current: 5)
maxMemoryInMB: Maximum memory in MB allocated to histogram aggregation. (default: 256)
minInfoGain: Minimum information gain for a split to be considered at a tree node. (default: 0.0)
minInstancesPerNode: Minimum number of instances each child must have after split.  If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1. (default: 1)
predictionCol: prediction column name (default: prediction)
probabilityCol: Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities (default: probability)
rawPredictionCol: raw prediction (a.k.a. confidence) column name (default: rawPrediction)
seed: random seed (default: 159147643)
thresholds: Thresholds in multi-class classification to adjust the probability of predicting each class. Array must have length equal to the number of classes, with values >= 0. The class with largest value p/t is predicted, where p is the original probability of that class and t is the class' threshold. (undefined)

​ 这里设置一个labelConverter,目的是把预测的类别重新转化成字符型的。

scala> val labelConverter = new IndexToString().
     |       setInputCol("prediction").
     |       setOutputCol("predictedLabel").
     |       setLabels(labelIndexer.labels)
labelConverter: org.apache.spark.ml.feature.IndexToString = idxToStr_5c051ba9ebd3

​ 构建pipeline,设置stage,然后调用fit()来训练模型。

scala> val pipeline = new Pipeline().
     |       setStages(Array(labelIndexer, featureIndexer, dt, labelConverter))
pipeline: org.apache.spark.ml.Pipeline = pipeline_eb9649610b0f
// Fit the pipeline to training documents.
scala> val model = pipeline.fit(trainingData)
model: org.apache.spark.ml.PipelineModel = pipeline_eb9649610b0f

​ pipeline本质上是一个Estimator,当pipeline调用fit()的时候就产生了一个PipelineModel,本质上是一个Transformer。然后这个PipelineModel就可以调用transform()来进行预测,生成一个新的DataFrame,即利用训练得到的模型对测试集进行验证。

scala> val predictions = model.transform(testData)
predictions: org.apache.spark.sql.DataFrame = [features: vector, label: string,
indexedLabel: double, indexedFeatures: vector, rawPrediction: vector, probabilit
y: vector, prediction: double, predictedLabel: string]

​ 最后可以输出预测的结果,其中select选择要输出的列,collect获取所有行的数据,用foreach把每行打印出来。

scala> predictions.
     |       select("predictedLabel", "label", "features").
     |       collect().
     |       foreach { case Row(predictedLabel: String, label: String, features:
 Vector) =>
     |         println(s"($label, $features) --> predictedLabel=$predictedLabel")
     |     }
(Iris-setosa, [1.2,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.3,0.3]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.3,0.4]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.4,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.4,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.4,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.5,0.1]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.5,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.6,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.6,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.6,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.7,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.7,0.4]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.9,0.2]) --> predictedLabel=Iris-setosa
(Iris-versicolor, [4.0,1.0]) --> predictedLabel=Iris-versicolor
... ... 
4. 模型评估

​ 创建一个MulticlassClassificationEvaluator实例,用setter方法把预测分类的列名和真实分类的列名进行设置;然后计算预测准确率和错误率。

scala> val evaluator = new MulticlassClassificationEvaluator().
     |       setLabelCol("indexedLabel").
     |       setPredictionCol("prediction")
evaluator: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mc Eval_4651752eb9e9
scala> val accuracy = evaluator.evaluate(predictions)
accuracy: Double = 0.96
scala> println("Test Error = " + (1.0 - accuracy))
Test Error = 0.040000000000000036

​ 从上面可以看到预测的准确性达到96%,接下来可以通过model来获取我们训练得到的决策树模型。前面已经说过model是一个PipelineModel,因此可以通过调用它的stages来获取模型,具体如下:

scala> val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel]
treeModel: org.apache.spark.ml.classification.DecisionTreeClassificationModel =
DecisionTreeClassificationModel (uid=dtc_16842f2bb6a7) of depth 4 with 13 nodes
scala> println("Learned classification tree model:\n" + treeModel.toDebugString)
Learned classification tree model:
DecisionTreeClassificationModel (uid=dtc_16842f2bb6a7) of depth 4 with 13 nodes
  If (feature 0 <= 1.9)
   Predict: 2.0
  Else (feature 0 > 1.9)
   If (feature 1 <= 1.7)
    If (feature 0 <= 4.9)
     If (feature 1 <= 1.6)
      Predict: 0.0
     Else (feature 1 > 1.6)
      Predict: 1.0
    Else (feature 0 > 4.9)
     If (feature 1 <= 1.6)
      Predict: 1.0
     Else (feature 1 > 1.6)
      Predict: 0.0
   Else (feature 1 > 1.7)
    If (feature 0 <= 4.8)
     Predict: 0.0
    Else (feature 0 > 4.8)
     Predict: 1.0

子雨大数据之Spark入门
扫一扫访问本博客