Spark2.1.0 入门:特征变换–标签和索引的转化  

大数据学习路线图

【版权声明】博客内容由厦门大学数据库实验室拥有版权,未经允许,请勿转载!

[返回Spark教程首页]


返回 Spark MLlib入门教程

在机器学习处理过程中,为了方便相关算法的实现,经常需要把标签数据(一般是字符串)转化成整数索引,或是在计算结束后将整数索引还原为相应的标签。
Spark ML包中提供了几个相关的转换器,例如:StringIndexerIndexToStringOneHotEncoderVectorIndexer,它们提供了十分方便的特征转换功能,这些转换器类都位于org.apache.spark.ml.feature包下。

值得注意的是,用于特征转换的转换器和其他的机器学习算法一样,也属于ML Pipeline模型的一部分,可以用来构成机器学习流水线,以StringIndexer为例,其存储着进行标签数值化过程的相关 超参数,是一个Estimator,对其调用fit(..)方法即可生成相应的模型StringIndexerModel类,很显然,它存储了用于DataFrame进行相关处理的 参数,是一个Transformer(其他转换器也是同一原理)。

由于Spark2.0起,SQLContextHiveContext已经不再推荐使用,改以SparkSession代之,故本文中不再使用SQLContext来进行相关的操作,关于SparkSession的具体详情,这里不再赘述,可以参看Spark2.0的官方文档

Spark2.0以上版本的spark-shell在启动时会自动创建一个名为sparkSparkSession对象,当需要手工创建时,SparkSession可以由其伴生对象的builder()方法创建出来,如下代码段所示:

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().
            master("local").
            appName("my App Name").
            getOrCreate()

SQLContext一样,也可以开启RDD的隐式转换:

import spark.implicits._

下面对几个常用的转换器依次进行介绍。

StringIndexer

StringIndexer转换器可以把一列类别型的特征(或标签)进行编码,使其数值化,索引的范围从0开始,该过程可以使得相应的特征索引化,使得某些无法接受类别型特征的算法可以使用,并提高诸如决策树等机器学习算法的效率。

索引构建的顺序为标签的频率,优先编码频率较大的标签,所以出现频率最高的标签为0号。
如果输入的是数值型的,我们会把它转化成字符型,然后再对其进行编码。

首先,引入必要的包,并创建一个简单的DataFrame,它只包含一个id列和一个标签列category

import org.apache.spark.ml.feature.{StringIndexer, StringIndexerModel}

scala> val df1 = spark.createDataFrame(Seq(
 |               (0, "a"),
 |               (1, "b"),
 |               (2, "c"),
 |               (3, "a"),
 |               (4, "a"),
 |               (5, "c"))).toDF("id", "category")
df1: org.apache.spark.sql.DataFrame = [id: int, category: string]

随后,我们创建一个StringIndexer对象,设定输入输出列名,其余参数采用默认值,并对这个DataFrame进行训练,产生StringIndexerModel对象:

scala> val indexer = new StringIndexer().
 |       setInputCol("category").
 |       setOutputCol("categoryIndex")
indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_95a0a5afdb8b

scala> val model = indexer.fit(df1)
model: org.apache.spark.ml.feature.StringIndexerModel = strIdx_4fa3ca8a82ea

随后即可利用该对象对DataFrame进行转换操作,可以看到,StringIndexerModel依次按照出现频率的高低,把字符标签进行了排序,即出现最多的“a”被编号成0,“c”为1,出现最少的“b”为0。

scala> val indexed1 = model.transform(df1)
indexed1: org.apache.spark.sql.DataFrame = [id: int, category: string, categoryIndex: double]

scala> indexed1.show()
+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
|  5|       c|          1.0|
+---+--------+-------------+

考虑这样一种情况,我们使用已有的数据构建了一个StringIndexerModel,然后再构建一个新的DataFrame,这个DataFrame中有着模型内未曾出现的标签“d”,用已有的模型去转换这一DataFrame会有什么效果?

实际上,如果直接转换的话,Spark会抛出异常,报出“Unseen label: d”的错误。

为了处理这种情况,在模型训练后,可以通过设置setHandleInvalid("skip")来忽略掉那些未出现的标签,这样,带有未出现标签的行将直接被过滤掉,所下所示:

scala> val df2 = spark.createDataFrame(Seq(
 |                     (0, "a"),
 |                     (1, "b"),
 |                     (2, "c"),
 |                     (3, "a"),
 |                     (4, "a"),
 |                     (5, "d"))).toDF("id", "category")
df2: org.apache.spark.sql.DataFrame = [id: int, category: string]

scala> val indexed = model.transform(df2)
indexed3: org.apache.spark.sql.DataFrame = [id: int, category: string, categoryIndex: double]

scala> indexed.show()
org.apache.spark.SparkException: Unseen label: d.

scala> val indexed2 = model.setHandleInvalid("skip").transform(df2)
indexed2: org.apache.spark.sql.DataFrame = [id: int, category: string, categoryIndex: double]

scala> indexed2.show()
+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
+---+--------+-------------+


IndexToString

​与StringIndexer相对应,IndexToString的作用是把标签索引的一列重新映射回原有的字符型标签。

其主要使用场景一般都是和StringIndexer配合,先用StringIndexer将标签转化成标签索引,进行模型训练,然后在预测标签的时候再把标签索引转化成原有的字符标签。当然,你也可以另外定义其他的标签。

首先,和StringIndexer的实验相同,我们用StringIndexer读取数据集中的“category”列,把字符型标签转化成标签索引,然后输出到“categoryIndex”列上,构建出新的DataFrame

scala> val df = spark.createDataFrame(Seq(
 |       (0, "a"),
 |       (1, "b"),
 |       (2, "c"),
 |       (3, "a"),
 |       (4, "a"),
 |       (5, "c")
 |     )).toDF("id", "category")
df: org.apache.spark.sql.DataFrame = [id: int, category: string]

scala> val model = new StringIndexer().
 |       setInputCol("category").
 |       setOutputCol("categoryIndex").
 |       fit(df)
indexer: org.apache.spark.ml.feature.StringIndexerModel = strIdx_00fde0fe64d0

scala> val indexed = indexer.transform(df)
indexed: org.apache.spark.sql.DataFrame = [id: int, category: string, categoryIndex: double]

随后,创建IndexToString对象,读取“categoryIndex”上的标签索引,获得原有数据集的字符型标签,然后再输出到“originalCategory”列上。最后,通过输出“originalCategory”列,可以看到数据集中原有的字符标签。

scala> val converter = new IndexToString().
 |       setInputCol("categoryIndex").
 |       setOutputCol("originalCategory")
converter: org.apache.spark.ml.feature.IndexToString = idxToStr_b95208a0e7ac

scala> val converted = converter.transform(indexed)
converted: org.apache.spark.sql.DataFrame = [id: int, category: string, categoryIndex: double, originalCategory: string]

scala> converted.select("id", "originalCategory").show()
+---+----------------+
| id|originalCategory|
+---+----------------+
|  0|               a|
|  1|               b|
|  2|               c|
|  3|               a|
|  4|               a|
|  5|               c|
+---+----------------+

OneHotEncoder

​独热编码(One-Hot Encoding) 是指把一列类别性特征(或称名词性特征,nominal/categorical features)映射成一系列的二元连续特征的过程,原有的类别性特征有几种可能取值,这一特征就会被映射成几个二元连续特征,每一个特征代表一种取值,若该样本表现出该特征,则取1,否则取0。

One-Hot编码适合一些期望类别特征为连续特征的算法,比如说逻辑斯蒂回归等。

首先创建一个DataFrame,其包含一列类别性特征,需要注意的是,在使用OneHotEncoder进行转换前,DataFrame需要先使用StringIndexer将原始标签数值化:

import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}

scala> val df = spark.createDataFrame(Seq(
 |       (0, "a"),
 |       (1, "b"),
 |       (2, "c"),
 |       (3, "a"),
 |       (4, "a"),
 |       (5, "c"),
 |       (6, "d"),
 |       (7, "d"),
 |       (8, "d"),
 |       (9, "d"),
 |       (10, "e"),
 |       (11, "e"),
 |       (12, "e"),
 |       (13, "e"),
 |       (14, "e")
 |     )).toDF("id", "category")
df: org.apache.spark.sql.DataFrame = [id: int, category: string]

scala> val indexer = new StringIndexer().
 |       setInputCol("category").
 |       setOutputCol("categoryIndex").
 |       fit(df)
indexer: org.apache.spark.ml.feature.StringIndexerModel = strIdx_b315cf21d22d

scala> val indexed = indexer.transform(df)
indexed: org.apache.spark.sql.DataFrame = [id: int, category: string, categoryIndex: double]

随后,我们创建OneHotEncoder对象对处理后的DataFrame进行编码,可以看见,编码后的二进制特征呈稀疏向量形式,与StringIndexer编码的顺序相同,需注意的是最后一个Category("b")被编码为全0向量,若希望"b"也占有一个二进制特征,则可在创建OneHotEncoder时指定setDropLast(false)

scala> val encoder = new OneHotEncoder().
 |       setInputCol("categoryIndex").
 |       setOutputCol("categoryVec")
encoder: org.apache.spark.ml.feature.OneHotEncoder = oneHot_bbf16821b33a

scala> val encoded = encoder.transform(indexed)
encoded: org.apache.spark.sql.DataFrame = [id: int, category: string, categoryIndex: double, categoryVec: vector]

scala> encoded.show()
+---+--------+-------------+-------------+
| id|category|categoryIndex|  categoryVec|
+---+--------+-------------+-------------+
|  0|       a|          2.0|(4,[2],[1.0])|
|  1|       b|          4.0|    (4,[],[])|
|  2|       c|          3.0|(4,[3],[1.0])|
|  3|       a|          2.0|(4,[2],[1.0])|
|  4|       a|          2.0|(4,[2],[1.0])|
|  5|       c|          3.0|(4,[3],[1.0])|
|  6|       d|          1.0|(4,[1],[1.0])|
|  7|       d|          1.0|(4,[1],[1.0])|
|  8|       d|          1.0|(4,[1],[1.0])|
|  9|       d|          1.0|(4,[1],[1.0])|
| 10|       e|          0.0|(4,[0],[1.0])|
| 11|       e|          0.0|(4,[0],[1.0])|
| 12|       e|          0.0|(4,[0],[1.0])|
| 13|       e|          0.0|(4,[0],[1.0])|
| 14|       e|          0.0|(4,[0],[1.0])|
+---+--------+-------------+-------------+

VectorIndexer

之前介绍的StringIndexer是针对单个类别型特征进行转换,倘若所有特征都已经被组织在一个向量中,又想对其中某些单个分量进行处理时,Spark ML提供了VectorIndexer类来解决向量数据集中的类别性特征转换。

通过为其提供maxCategories超参数,它可以自动识别哪些特征是类别型的,并且将原始值转换为类别索引。它基于不同特征值的数量来识别哪些特征需要被类别化,那些取值可能性最多不超过maxCategories的特征需要会被认为是类别型的。

在下面的例子中,我们读入一个数据集,然后使用VectorIndexer训练出模型,来决定哪些特征需要被作为类别特征,将类别特征转换为索引,这里设置maxCategories为2,即只有种类小于2的特征才被认为是类别型特征,否则被认为是连续型特征:

import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.linalg.{Vector, Vectors}

scala> val data = Seq(
 |         Vectors.dense(-1.0, 1.0, 1.0),
 |         Vectors.dense(-1.0, 3.0, 1.0),
 |         Vectors.dense(0.0, 5.0, 1.0))
data: Seq[org.apache.spark.ml.linalg.Vector] = List([-1.0,1.0,1.0], [-1.0,3.0,1.0], [0.0,5.0,1.0])

scala> val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
df: org.apache.spark.sql.DataFrame = [features: vector]

scala> val indexer = new VectorIndexer().
 |       setInputCol("features").
 |       setOutputCol("indexed").
 |       setMaxCategories(2)
indexer: org.apache.spark.ml.feature.VectorIndexer = vecIdx_abee81bafba8

scala> val indexerModel = indexer.fit(df)
indexerModel: org.apache.spark.ml.feature.VectorIndexerModel = vecIdx_abee81bafba8

可以通过VectorIndexerModelcategoryMaps成员来获得被转换的特征及其映射,这里可以看到共有两个特征被转换,分别是0号和2号。

scala> val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
categoricalFeatures: Set[Int] = Set(0, 2)

scala> println(s"Chose ${categoricalFeatures.size} categorical features: " + categoricalFeatures.mkString(", "))
Chose 2 categorical features: 0, 2

可以看到,0号特征只有-1,0两种取值,分别被映射成0,1,而2号特征只有1种取值,被映射成0。

scala> val indexed = indexerModel.transform(df)
indexed: org.apache.spark.sql.DataFrame = [features: vector, indexed: vector]

scala> indexed.show()
+--------------+-------------+
|      features|      indexed|
+--------------+-------------+
|[-1.0,1.0,1.0]|[1.0,1.0,0.0]|
|[-1.0,3.0,1.0]|[1.0,3.0,0.0]|
| [0.0,5.0,1.0]|[0.0,5.0,0.0]|
+--------------+-------------+



子雨大数据之Spark入门
扫一扫访问本博客