代码-第9章 Spark-MLlib-林子雨编著《Spark编程基础(Python版,第2版)》

大数据学习路线图

厦门大学林子雨编著《Spark编程基础(Python版,第2版)》教材中的命令行和代码(教材官网
提供了教材中的所有章节的命令行和代码,可以直接复制粘贴去执行。
查看《Spark编程基础(Python版,第2版)》教材中的所有命令行和代码

>>> from pyspark.ml.linalg import Vectors
# 创建一个稠密向量
>>> dv = Vectors.dense([2.0, 0.0, 8.0])
# 创建一个稀疏向量
# 方法的第2个参数(数组)指定了非0元素的索引,而第3个参数(数组)则给出了非零元素的值
>>> sv1 = Vectors.sparse(3, [0, 2], [2.0, 8.0])
# 另一种创建稀疏向量的方法
# 方法的第2个参数是一个序列,其中每个元素都是一个非零值的元组:(index,elem)
>>> sv2 = Vectors.sparse(3, [(0, 2.0), (2, 8.0)])
>>> print("稠密向量 dv:", dv)
稠密向量 dv: [2.0,0.0,8.0]
>>> print("稀疏向量 sv1:", sv1)
稀疏向量 sv1: (3,[0,2],[2.0,8.0])
>>> print("稀疏向量 sv2:", sv2)
稀疏向量 sv2: (3,[0,2],[2.0,8.0])
>>> from pyspark.mllib.regression import LabeledPoint #引入必要的包
>>> from pyspark.mllib.linalg import Vectors
>>> from pyspark.sql.session import SparkSession

# 创建一个标签为1.0(分类中可视为正样本)的稠密向量标注点
>>> pos = LabeledPoint(1.0, Vectors.dense([2.0, 0.0, 8.0]))

# 创建一个标签为0.0(分类中可视为负样本)的稀疏向量标注点
>>> neg = LabeledPoint(0.0, Vectors.sparse(3, [0, 2], [2.0, 8.0]))

>>> print("Positive Example:", pos)
Positive Example: (1.0,[2.0,0.0,8.0])
>>> print("Negative Example:", neg)
Negative Example: (0.0,(3,[0,2],[2.0,8.0]))
# 从文件加载LIBSVM格式的数据
>>> path = "file:///usr/local/spark/data/mllib/sample_libsvm_data.txt"
# 创建一个 SparkSession(如果尚未创建)
>>> spark = SparkSession.builder.appName("example").getOrCreate()
>>> examples = spark.read.format("libsvm").load(path)
>>> examples.head()
>>> from pyspark.ml.linalg import Matrices

# 创建一个3行2列的稠密矩阵[ [1.0, 2.0], [3.0, 4.0], [5.0, 6.0] ]
# 注意,这里的数组参数是列优先的,即按照列的方式从数组中提取元素
>>> dm = Matrices.dense(3, 2, [1.0, 3.0, 5.0, 2.0, 4.0, 6.0])

>>> print(dm)
# 创建一个3行2列的稀疏矩阵[ [9.0,0.0], [0.0,8.0], [0.0,6.0]]
# 第1个数组参数表示列指针,即每一列元素的开始索引
# 第2个数组参数表示行索引,即对应的元素属于哪一行
# 第3个数组参数即按列优先排列的所有非零元素,通过列指针和行索引即可判断每个元素所在的位置
>>> sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9.0, 6.0, 8.0])

>>> print(sm)
第1步:读取Spark自带的图像数据源中数据。
# 从文件加载图像数据
# 这里假设您的图像数据位于 "file:///usr/local/spark/data/mllib/images/origin/kittens" 路径下
>>> from pyspark.sql import SparkSession
# 创建一个 SparkSession(如果尚未创建)
>>> spark = SparkSession.builder.appName("example").getOrCreate()
>>> df = spark.read.format("image") \
    .option("dropInvalid", True) \
    .load("file:///usr/local/spark/data/mllib/images/origin/kittens")
第2步:输出image列的origin 、width和height属性值。
# 选择DataFrame中的图像属性
>>> selected_df = df.select("image.origin", "image.width", "image.height")

# 显示图像属性,不截断显示
>>> selected_df.show(truncate=False)
第1步:读取Spark自带的LIBSVM数据源中数据。
>>> df = spark.read.format("libsvm") \
    .option("numFeatures", "780") \
    .load("file:///usr/local/spark/data/mllib/sample_libsvm_data.txt")
第2步:输出结果。
>>> df.show(10)
第1步:导入相关性方法所需要的包。
>>> from pyspark.sql import SparkSession
>>> from pyspark.ml.linalg import Vectors
>>> from pyspark.sql import Row
>>> from pyspark.ml.stat import Correlation
第2步:创建实验数据,并转化成DataFrame。
# 创建一个 SparkSession(如果尚未创建)
>>> spark = SparkSession.builder.appName("example").getOrCreate()

# 示例数据,包含稀疏向量和稠密向量
>>> data = [
    Vectors.sparse(4, [(0, 2.0), (2, -1.0)]),
    Vectors.dense([3.0, 0.0, 4.0, 5.0]),
    Vectors.dense([6.0, 8.0, 0.0, 7.0])
]

# 将数据集转换为 DataFrame
>>> rows = [Row(features=vec) for vec in data]
>>> df = spark.createDataFrame(rows)

# 打印 DataFrame
>>> df.show()
+--------------------+
|            features|
+--------------------+
|(4,[0,2],[2.0,-1.0])|
|   [3.0,0.0,4.0,5.0]|
|   [6.0,8.0,0.0,7.0]|
+--------------------+
第3步:调用Correlation包中的corr( )函数来获取皮尔逊相关系数。
# 计算特征列的相关性矩阵
>>> corr_matrix = Correlation.corr(df, "features")

# 提取相关性矩阵的值
>>> coeff1 = corr_matrix.collect()[0]["pearson(features)"]

>>> print(coeff1)
DenseMatrix([[ 1.        ,  0.97072534, -0.09078413,  0.8660254 ],
             [ 0.97072534,  1.        , -0.32732684,  0.72057669],
             [-0.09078413, -0.32732684,  1.        ,  0.41931393],
             [ 0.8660254 ,  0.72057669,  0.41931393,  1.        ]])
第4步:使用指定的斯皮尔曼相关系数计算输入数据集的相关性。
# 计算特征列的Spearman相关性矩阵
>>> corr_matrix_spearman = Correlation.corr(df, "features", "spearman")

# 提取Spearman相关性矩阵的值
>>> coeff2 = corr_matrix_spearman.collect()[0]["spearman(features)"]

>>> print(coeff2)
DenseMatrix([[1.       , 0.8660254, 0.5      , 1.       ],
             [0.8660254, 1.       , 0.       , 0.8660254],
             [0.5      , 0.       , 1.       , 0.5      ],
             [1.       , 0.8660254, 0.5      , 1.       ]])
第1步:导入卡方检验所需要的包。
>>> from pyspark.ml.linalg import Vector, Vectors
>>> from pyspark.ml.stat import ChiSquareTest
第2步:创建实验数据集,该数据集具有5个样本、2个特征维度,标签有1.0和0.0两种。
# 创建一个 SparkSession(如果尚未创建)
>>> spark = SparkSession.builder.appName("example").getOrCreate()
# 创建数据集
>>> data = [
    (0.0, Vectors.dense([3.5, 40.0])),
    (0.0, Vectors.dense([3.5, 30.0])),
    (1.0, Vectors.dense([1.5, 30.0])),
    (0.0, Vectors.dense([1.5, 20.0])),
    (0.0, Vectors.dense([0.5, 10.0]))
]

# 将数据集转换为DataFrame
>>> df = spark.createDataFrame([Row(label=label, features=vec) for label, vec in data])
第3步:调用ChiSquareTest包中的test( )函数,将(特征,标签)转换成一个列联矩阵,计算卡方检验的统计值。这里要求所有的标签和特征值必须是分类的。
# 执行卡方独立性检验
>>> chi_square_result = ChiSquareTest.test(df, "features", "label")

>>> chi_square_result.show()
+--------------------+----------------+--------------------+
|             pValues|degreesOfFreedom|          statistics|
+--------------------+----------------+--------------------+
|[0.39160562667679...|          [2, 3]|[1.87500000000000...|
+--------------------+----------------+--------------------+
第4步:分别获取卡方分布右尾概率、自由度、卡方检验的统计值。
# 打印p-values
>>> print("p-values:")
>>> chi_square_result.select("pValues").show(truncate=False)
p-values:
+---------------------------------------+
|pValues                                |
+---------------------------------------+
|[0.3916056266767989,0.5987516330675617]|
+---------------------------------------+
# 打印degreesOfFreedom
>>> print("Degrees of Freedom:")
>>> chi_square_result.select("degreesOfFreedom").show(truncate=False)
Degrees of Freedom:
+----------------+
|degreesOfFreedom|
+----------------+
|[2, 3]          |
+----------------+

# 打印statistics
>>> print("Statistics:")
>>> chi_square_result.select("statistics").show(truncate=False)
Statistics:
+--------------------------+
|statistics                |
+--------------------------+
|[1.8750000000000002,1.875]|
+--------------------------+
第1步:导入汇总统计所需要的包。
>>> from pyspark.ml.linalg import Vector, Vectors
>>> from pyspark.ml.stat import Summarizer
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.functions import col
第2步:创建实验数据,两组数据的权重分别为1.0和3.0,权重之和为4.0。
# 创建一个 SparkSession
>>> spark = SparkSession.builder.appName("example").getOrCreate()
# 创建数据集
>>> data = [
    (Vectors.dense([1.0, 2.0, 4.0]), 1.0),
    (Vectors.dense([4.0, 3.0, 6.0]), 3.0)
]
# 将数据集转换为 DataFrame
>>> columns = ["features", "weight"]
>>> df = spark.createDataFrame(data, columns)
# 显示数据集
>>> df.show()
+-------------+------+
|     features|weight|
+-------------+------+
|[1.0,2.0,4.0]|   1.0|
|[4.0,3.0,6.0]|   3.0|
+-------------+------+
第3步:计算得到数据的加权平均值和加权方差。
>>> summary = df.select(Summarizer.metrics("mean", "variance")
                    .summary(col("features"), col("weight")).alias("summary"))
>>> meanVal = summary.select("summary.mean").first()[0]
>>> varianceVal = summary.select("summary.variance").first()[0]

>>> print("Mean Values:", meanVal)
Mean Values: [3.25,2.75,5.5]
>>> print("Variance Values:", varianceVal)
Variance Values: [4.5,0.5,2.0]
第4步:计算得到数据无权重下的平均值和方差。
>>> summary = df.select(Summarizer.metrics("mean", "variance")
                    .summary(col("features")).alias("summary"))
>>> meanVal = summary.select("summary.mean").first()[0]
>>> varianceVal = summary.select("summary.variance").first()[0]

>>> print("平均值:", meanVal)
平均值: [2.5,2.5,5.0]
>>> print("方差:", varianceVal)
方差: [4.5,0.5,2.0]
第1步:导入TF-IDF所需要的包。
>>> from pyspark.ml.feature import HashingTF, IDF, Tokenizer
>>> from pyspark.sql import SparkSession
第2步:创建一个集合,每一个句子代表一个文件。
# 创建一个 SparkSession(如果尚未创建)
>>> spark = SparkSession.builder.appName("example").getOrCreate()

# 创建数据集
>>> data = [
    (0, "I heard about Spark and I love Spark"),
  (0, "I wish Java could use case classes"),
    (1, "Logistic regression models are neat")
]

# 将数据集转换为DataFrame
>>> sentence_data = spark.createDataFrame(data, ["label", "sentence"])

# 显示DataFrame
>>> sentence_data.show()
+-----+--------------------+
|label|            sentence|
+-----+--------------------+
|    0|I heard about Spa...|
|    0|I wish Java could...|
|    1|Logistic regressi...|
+-----+--------------------+
第3步:用Tokenizer把每个句子分解成单词。
# 创建Tokenizer对象并设置输入列和输出列
>>> tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

# 使用Tokenizer拆分句子列
>>> words_data = tokenizer.transform(sentence_data)

# 显示DataFrame,不截断显示
>>> words_data.show(truncate=False)

+-----+------------------------------------+-------------------------------------
|label|sentence                            |words                                        |
+-----+------------------------------------+-------------------------------------
|0    |I heard about Spark and I love Spark|[i, heard, about, spark, and, i, love, spark]|
|0    |I wish Java could use case classes  |[i, wish, java, could, use, case, classes]   
|1    |Logistic regression models are neat |[logistic, regression, models, are, neat]    
+-----+------------------------------------+-------------------------------------
从输出结果可以看出,Tokenizer的transform( )方法把每个句子拆分成多个单词,这些单词构成一个“词袋”(里面装了很多个单词)。
第4步:用HashingTF的transform( )方法把每个“词袋”都哈希成特征向量。这里设置哈希表的桶数为2000。
# 创建HashingTF对象并设置输入列、输出列以及特征数量
>>> hashing_tf = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=2000)

# 使用HashingTF将单词列转换为特征向量
>>> featurized_data = hashing_tf.transform(words_data)

# 显示DataFrame,不截断显示
>>> featurized_data.select("words", "rawFeatures").show(truncate=False)
第5步:调用IDF( )方法来重新构造特征向量的规模,生成的变量idf是一个评估器,在特征向量上应用它的fit( )方法,会产生一个IDFModel(名称为“idfModel”)。
# 创建IDF对象并设置输入列和输出列
>>> idf = IDF(inputCol="rawFeatures", outputCol="features")

# 拟合数据以创建IDF模型
>>> idf_model = idf.fit(featurized_data)
第6步:调用IDFModel的transform( )方法,得到每一个单词对应的TF-IDF度量值。
# 使用IDF模型转换数据
>>> rescaled_data = idf_model.transform(featurized_data)

# 显示DataFrame中的特征列和标签列
>>> rescaled_data.select("features", "label").show(truncate=False)
>>> from pyspark.ml.feature import StringIndexer, IndexToString
>>> from pyspark.sql import SparkSession
其次,构建1个DataFrame,设置StringIndexer的输入列和输出列的名称。
# 创建一个 SparkSession(如果尚未创建)
>>> spark = SparkSession.builder.appName("example").getOrCreate()

# 创建数据集
>>> data = [
    (0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")
]

# 将数据集转换为DataFrame
>>> df1 = spark.createDataFrame(data, ["id", "category"])

# 创建StringIndexer对象并设置输入列和输出列
>>> indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
这里首先用StringIndexer读取数据集中的category列,把字符串型标签转化成标签索引,然后输出到categoryIndex列上。最后,通过fit( )方法进行模型训练,用训练出的模型对原数据集进行处理,并通过indexed.show( )进行展示。
# 对DataFrame进行索引
>>> indexed_df = indexer.fit(df1).transform(df1)

# 显示DataFrame
>>> indexed_df.show()
# 创建一个 IndexToString 转换器
>>> index_to_string = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")

# 使用转换器将索引列转换回原始类别列
>>> string_data = index_to_string.transform(indexed_df)

# 显示DataFrame中的"id"和"originalCategory"列
>>> string_data.select("id", "originalCategory").show()
>>> from pyspark.ml.feature import VectorIndexer
>>> from pyspark.ml.linalg import Vectors
>>> from pyspark.sql import SparkSession

# 创建Spark会话
>>> spark = SparkSession.builder.appName("example").getOrCreate()

# 创建示例数据
>>> data = [Vectors.dense(-1.0, 1.0, 1.0),
        Vectors.dense(-1.0, 3.0, 1.0),
        Vectors.dense(0.0, 5.0, 1.0)]

# 创建DataFrame并指定列名
>>> df = spark.createDataFrame([(vector,) for vector in data], ["features"])
然后,构建VectorIndexer转换器,设置输入列和输出列,并进行模型训练。
# 创建VectorIndexer对象并设置输入列、输出列以及最大不同特征值的数量
>>> vector_indexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=2)

>>> indexerModel = vector_indexer.fit(df)
>>> indexedData = indexerModel.transform(df)

>>> indexedData.show()
>>> categorical_features = set(indexerModel.categoryMaps.keys())

# 打印分类特征信息
>>> print(f"Chose {len(categorical_features)} categorical features: {', '.join(map(str, categorical_features))}")
Chose 2 categorical features: 0, 2
最后,把模型应用于原有的数据,并输出结果。
# 使用VectorIndexerModel对DataFrame进行特征列索引
>>> indexed = indexerModel.transform(df)

# 显示DataFrame
>>> indexed.show()
首先,进行环境的设置,引入卡方选择所需要使用的类。
>>> from pyspark.sql import SparkSession
>>> from pyspark.ml.linalg import Vectors
>>> from pyspark.sql import Row
>>> from pyspark.ml.feature import ChiSqSelector
其次,创建实验数据,这是一个具有3个样本、4个特征维度的数据集,标签有1和0两种,我们将在此数据集上进行卡方选择。
# 创建Spark会话
>>> spark = SparkSession.builder.appName("example").getOrCreate()

# 创建数据集
>>> data = [
    (1, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1),
    (2, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0),
    (3, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0)
]

# 将数据集转换为DataFrame
>>> df = spark.createDataFrame([Row(id=id, features=vec, label=label) for id, vec, label in data])
>>> df.show()
+---+------------------+-----+
| id|          features|label|
+---+------------------+-----+
|  1|[0.0,0.0,18.0,1.0]|    1|
|  2|[0.0,1.0,12.0,0.0]|    0|
|  3|[1.0,0.0,15.0,0.1]|    0|
+---+------------------+-----+
然后,用卡方选择进行特征选择器的训练,为了便于观察,我们设置只选择和标签关联性最强的一个特征(可以通过setNumTopFeatures( )方法进行设置)。
# 创建ChiSqSelector对象并设置属性
>>> selector = ChiSqSelector(
    numTopFeatures=1,
    featuresCol="features",
    labelCol="label",
    outputCol="selected-feature"
)
最后,用训练出的模型对原数据集进行处理,可以看到,第3列特征作为最有用的特征列被选出。
# 拟合ChiSqSelector模型
>>> selector_model = selector.fit(df)

# 使用模型进行特征选择
>>> result = selector_model.transform(df)

# 显示DataFrame,不截断显示
>>> result.show(truncate=False)
第1步:导入本地向量Vector和Vectors,导入所需要的类。
>>> from pyspark.ml.linalg import Vector, Vectors
>>> from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
>>> from pyspark.ml.classification import LogisticRegression
>>> from pyspark.ml import Pipeline, PipelineModel
>>> from pyspark.sql import Row
>>> from pyspark.ml.classification import LogisticRegressionModel
>>> from pyspark.ml.evaluation import MulticlassClassificationEvaluator
>>> from pyspark.ml.feature import VectorAssembler
>>> from pyspark.sql.types import DoubleType
第2步:使用spark.read.csv( )方法读取CSV文件,再将4个特征转化为Double类型数据(转化为Double类型数据是为了适应文件末尾存在空行等情况),最后利用VectorAssembler( )将4个特征组合成向量。
# 指定CSV文件路径
>>> path = "file:///usr/local/spark/iris.data"

# 创建一个 SparkSession(如果尚未创建)
>>> spark = SparkSession.builder.appName("example").getOrCreate()

# 使用Spark读取CSV文件并推断列的数据类型,然后重命名列
>>> df_raw = spark.read.option("inferSchema", "true").csv(path).toDF("c0", "c1", "c2", "c3", "label")

# 将列的数据类型转换为Double
>>> df_double = df_raw.select(
            df_raw["c0"].cast(DoubleType()),
            df_raw["c1"].cast(DoubleType()),
            df_raw["c2"].cast(DoubleType()),
        df_raw["c3"].cast(DoubleType()),
            df_raw["label"]
)

# 创建VectorAssembler并设置输入列和输出列
>>> assembler = VectorAssembler(inputCols=["c0", "c1", "c2", "c3"], outputCol="features")

# 使用VectorAssembler将特征列组装成特征向量
>>> data = assembler.transform(df_double).select("features", "label")

# 显示DataFrame
>>> data.show()
第3步:分别获取标签列和特征列,进行索引并进行重命名。
# 使用StringIndexer对标签列进行索引
>>> label_indexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# 使用VectorIndexer对特征列进行索引
>>> feature_indexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(data)
第4步:设置LogisticRegression( )的参数。这里设置循环次数为100次、规范化项为0.3等。具体可以设置的参数,读者可以通过explainParams( )来获取,还能看到程序已经设置的参数的结果。
# 创建LogisticRegression模型并设置参数
>>> lr = LogisticRegression(
    labelCol="indexedLabel",
    featuresCol="indexedFeatures",
    maxIter=100,
    regParam=0.3,
    elasticNetParam=0.8
)

# 打印LogisticRegression模型的参数说明
>>> print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
第5步:设置一个IndexToString转换器,把预测的类别重新转化成字符串型的。构建一个机器学习流水线,设置各个阶段。上一个阶段的输出将作为本阶段的输入。
# 创建IndexToString转换器并设置输入列、输出列以及标签
>>> label_converter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=label_indexer.labels)

# 创建Pipeline,并将每个阶段添加到Pipeline中
>>> lr_pipeline = Pipeline(stages=[label_indexer, feature_indexer, lr, label_converter])
第6步:把数据集随机分成训练集和测试集,其中训练集占70%。Pipeline本质上是一个评估器,当Pipeline调用fit( )的时候就产生了一个PipelineModel,它是一个转换器。然后,这个PipelineModel就可以通过调用transform来进行预测,生成一个新的DataFrame,即利用训练得到的模型对测试集进行验证。
# 将数据拆分为训练集和测试集
>>> training_data, test_data = data.randomSplit([0.7, 0.3])

# 拟合Pipeline模型
>>> lr_pipeline_model = lr_pipeline.fit(training_data)
>>> lr_predictions = lr_pipeline_model.transform(test_data)
第7步:输出预测的结果。用select( )选择要输出的列,用collect( )获取所有行的数据,用for循环把每行都输出。
# 选择要显示的列,并遍历预测结果
>>> lr_predictions.select("predictedLabel", "label", "features", "probability").collect()
>>> for row in lr_predictions.collect():
            predicted_label = row.predictedLabel
            label = row.label
            features = row.features
            prob = row.probability
        print(f"({label}, {features}) --> prob={prob}, predicted
第8步:对训练的模型进行评估。创建一个MulticlassClassificationEvaluator实例,用setter( )方法对预测分类的列名和真实分类的列名进行设置,然后计算预测的准确率。
# 创建MulticlassClassificationEvaluator并设置标签列和预测列
>>> evaluator = MulticlassClassificationEvaluator()
.setLabelCol("indexedLabel").setPredictionCol("prediction")

# 计算模型的准确性
>>> lr_accuracy = evaluator.evaluate(lr_predictions)
>>> print("Logistic Regression Accuracy:", lr_accuracy)
Logistic Regression Accuracy: 0.8390640167577786
从上面的结果中可以看到,预测的准确率约为0.839064。
第9步:通过Pipeline Model来获取训练得到的逻辑斯谛模型。lrPipelineModel是一个PipelineModel,因此,可以通过调用它的stages( )方法来获取模型,详细代码如下。
# 获取LogisticRegression模型
>>> lr_model = lr_pipeline_model.stages[2]

# 打印模型的系数矩阵、截距、类别数和特征数
>>> print("Coefficients: \n", lr_model.coefficientMatrix)
>>> print("Intercept: ", lr_model.interceptVector)
>>> print("numClasses: ", lr_model.numClasses)
>>> print("numFeatures: ", lr_model.numFeatures)
第1步:导入需要的包。
>>> from pyspark.ml.classification import DecisionTreeClassifier
>>> from pyspark.ml import Pipeline
>>> from pyspark.ml.evaluation import MulticlassClassificationEvaluator
>>> from pyspark.ml.feature import VectorAssembler
>>> from pyspark.sql.types import DoubleType
>>> from pyspark.sql.session import SparkSession
第2步:用case类定义一个数据类Iris,创建一个Iris模式的RDD并转化成DataFrame。
# 创建SparkSession
>>> spark = SparkSession.builder.appName("example").getOrCreate()

# 指定CSV文件路径
>>> path = "file:///usr/local/spark/iris.data"

# 使用Spark读取CSV文件并推断列的数据类型,然后重命名列
>>> df_raw = spark.read.option("inferSchema", "true").csv(path).toDF("c0", "c1", "c2", "c3", "label")

# 将列的数据类型转换为Double
>>> df_double = df_raw.select(
            df_raw["c0"].cast(DoubleType()),
            df_raw["c1"].cast(DoubleType()),
            df_raw["c2"].cast(DoubleType()),
        df_raw["c3"].cast(DoubleType()),
            df_raw["label"]
)

# 创建VectorAssembler并设置输入列和输出列
>>> assembler = VectorAssembler(inputCols=["c0", "c1", "c2", "c3"], outputCol="features")

# 使用VectorAssembler将特征列组装成特征向量
>>> df = assembler.transform(df_double).select("features", "label")
第3步:进一步处理特征和标签,把数据集随机分成训练集和测试集,其中训练集占70%。
# 创建StringIndexer用于标签列
>>> labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(df)

# 创建VectorIndexer用于特征列
>>> featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(df)

# 创建IndexToString用于将预测的标签转换回原始标签
>>> labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)

# 随机拆分数据集为训练集和测试集
>>> trainingData, testData = df.randomSplit([0.7, 0.3])
第4步:创建决策树模型(DecisionTreeClassifier( )),通过setter( )方法来设置决策树的参数,也可以用ParamMap( )来设置。这里仅需要设置特征列FeaturesCol和待预测列LabelCol。具体可以设置的参数可以通过explainParams( )来获取。
# 创建DecisionTreeClassifier
>>> dtClassifier=DecisionTreeClassifier()
.setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures")
第5步:构建机器学习流水线,在训练数据集上调用fit( )进行模型训练,并在测试数据集上调用transform( )方法进行预测。
# 创建Pipeline,设置阶段
>>> dtPipeline = Pipeline(stages=[labelIndexer, featureIndexer, dtClassifier, labelConverter])
# 训练Pipeline模型
>>> dtPipelineModel = dtPipeline.fit(trainingData)

# 进行预测
>>> dtPredictions = dtPipelineModel.transform(testData)

# 选择需要展示的列
>>> selected = dtPredictions.select("predictedLabel", "label", "features")

# 显示前100行
>>> selected.show(100)
# 创建多类别分类评估器并计算准确率
>>> evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction")
>>> dtAccuracy = evaluator.evaluate(dtPredictions)
>>> print("Model Accuracy:", dtAccuracy)
Model Accuracy: 0.9575250312891115  //模型的预测准确率
第6步:通过调用DecisionTreeClassificationModel的toDebugString( )方法,查看训练的决策树模型结构。
# 获取DecisionTreeClassificationModel
>>> treeModelClassifier = dtPipelineModel.stages[2]

# 打印学习到的分类树模型
>>> print("Learned classification tree model:\n" + treeModelClassifier.toDebugString)
第1步:引入必要的类。
>>> from pyspark.ml.linalg import Vectors
>>> from pyspark.ml.clustering import KMeans, KMeansModel
>>> from pyspark.ml.evaluation import ClusteringEvaluator
>>> from pyspark.ml.feature import VectorAssembler
>>> from pyspark.sql.types import DoubleType
>>> from pyspark.sql.session import SparkSession
第2步:创建数据集。使用spark.read.csv( )方法读取CSV文件,再将4个属性转化为Double类型,最后利用VectorAssembler( )将4个属性组合成向量。
# 创建 SparkSession
>>> spark = SparkSession.builder.appName("example").getOrCreate()

# 加载数据
>>> path = "file:///usr/local/spark/iris.data"
>>> df_raw = spark.read.option("inferSchema", "true").csv(path).toDF("c0", "c1", "c2", "c3", "label")

# 将列的数据类型转换为Double
>>> df_double = df_raw.select(
            df_raw["c0"].cast(DoubleType()),
            df_raw["c1"].cast(DoubleType()),
            df_raw["c2"].cast(DoubleType()),
        df_raw["c3"].cast(DoubleType()),
            df_raw["label"]
)

# 创建 VectorAssembler
>>> assembler = VectorAssembler(inputCols=["c0", "c1", "c2", "c3"], outputCol="features")

# 使用 VectorAssembler 转换数据
>>> df = assembler.transform(df_double).select("features")
第3步:数据构建好后,即可创建Kmeans实例,并进行参数设置。
# 创建 KMeans 模型
>>> kmeans = KMeans().setK(3)  # 设置簇的数量
>>> kmeans.setFeaturesCol("features")  # 设置特征列
>>> kmeans.setPredictionCol("prediction")  # 设置预测列

# 训练 KMeans 模型
>>> kmeansmodel = kmeans.fit(df)
第4步:通过transform( )方法对存储在df中的数据集进行整体处理,生成带有预测簇标签的数据集。
# 进行聚类预测
>>> results = kmeansmodel.transform(df)

# 打印结果
>>> for result in results.collect():
        print(str(result.features) + " => cluster " + str(result.prediction))
第5步:通过KMeansModel类自带的clusterCenters属性获取模型的所有划分中心情况。
# 打印 KMeans 模型的聚类中心
>>> for center in kmeansmodel.clusterCenters():
        print("Clustering Center: {}".format(center))
第6步:使用spark.ml.evaluation.ClusteringEvaluator( )计算Silhouette分数来度量聚类的有效性,该值属于区间[-1,1],且越接近1表示簇内样本距离越小,不属于同一簇的样本距离越大。在K值未知的情况下,可利用该值选取合适的K值。
# 创建聚类评估器
>>> evaluator = ClusteringEvaluator()

# 计算轮廓系数
>>> silhouette = evaluator.evaluate(results)
>>> print("Silhouette Score:", silhouette)
第1步:引入需要的包。GMM在spark.ml.clustering包下,具体实现分为两个类:用于抽象GMM的超参数并进行训练的GaussianMixture类和训练后的模型GaussianMixtureModel类。
>>> from pyspark.ml.clustering import GaussianMixture, GaussianMixtureModel
>>> from pyspark.ml.linalg import Vectors
>>> from pyspark.sql.session import SparkSession
第2步:创建数据集。使用spark.read.csv( )方法读取CSV文件,再将4个属性转化为Double类型数据,最后利用VectorAssembler( )将4个属性组合成向量。
# 创建 SparkSession
>>> spark = SparkSession.builder.appName("example").getOrCreate()

# 加载数据
>>> path = "file:///usr/local/spark/iris.data"
>>> df_raw = spark.read.option("inferSchema", "true").csv(path).toDF("c0", "c1", "c2", "c3", "label")

# 将列的数据类型转换为Double
>>> df_double = df_raw.select(
            df_raw["c0"].cast(DoubleType()),
            df_raw["c1"].cast(DoubleType()),
            df_raw["c2"].cast(DoubleType()),
            df_raw["c3"].cast(DoubleType()),
            df_raw["label"]
)

# 创建 VectorAssembler
>>> assembler = VectorAssembler(inputCols=["c0", "c1", "c2", "c3"], outputCol="features")

# 使用 VectorAssembler 转换数据
>>> df = assembler.transform(df_double).select("features")
第3步:数据构建好后,即可创建一个GaussianMixture对象,设置相应的超参数,并调用fit( )方法来训练一个GMM模型GaussianMixture。
# 创建 Gaussian Mixture 模型
>>> gm = GaussianMixture().setK(3).setPredictionCol("Prediction")
.setProbabilityCol("Probability")

# 训练模型
>>> gmm = gm.fit(df)
第4步:调用transform( )方法处理数据集并进行输出。除了可以得到样本的聚簇归属预测,GMM还可以得到样本属于各个聚簇的概率(Probability列)。
# 使用 Gaussian Mixture 模型进行聚类预测
>>> result = gmm.transform(df)

# 显示结果
>>> result.show(150, truncate=False)
第5步:得到模型后即可查看模型的相关参数。与K-Means不同,GMM不直接给出划分中心,而是给出各个混合成分(多元高斯分布)的参数。GaussianMixtureModel类的weights成员获取各个混合成分的权重,gaussians成员获取各个混合成分。其中,GMM的每一个混合成分都使用一个MultivariateGaussian类(位于pyspark.ml.stat包中)来存储,可以通过gaussians成员来获取各个混合成分的参数(均值向量和协方差矩阵)。
# 获取聚类簇数
>>> k = gmm.getK()

# 打印每个组件的权重、均值向量和协方差矩阵
>>> for i in range(k):
            print("Component {}: ".format(i))
            print("Weight: {}".format(gmm.weights[i]))
            print("Mu Vector: \n{}".format(gmm.gaussians[i].mean))
            print("Sigma Matrix: \n{}".format(gmm.gaussians[i].cov))
第1步:引入需要的包。
>>> from pyspark.ml.fpm import FPGrowth
>>> from pyspark.sql import SparkSession
第2步:读取sample_fpgrowth数据集中的每一行,并用空格分割其中的项目,将分割出的项目转化成列表。
# 创建 SparkSession
>>> spark = SparkSession.builder \
            .appName("example") \
            .getOrCreate()

# 加载文本数据并创建 DataFrame
>>> data = spark.sparkContext.textFile
("file:///usr/local/spark/data/mllib/sample_fpgrowth.txt")
    .map(lambda t: (t.split(" "),))
    .toDF(["items"])

# 显示 DataFrame
>>> data.show(truncate=False)
第3步:数据构建好后,即可创建FP-Growth模型,并进行参数设置。
# 创建 FPGrowth 模型
>>> fpgrowth = FPGrowth()
.setItemsCol("items").setMinSupport(0.5).setMinConfidence(0.6)

# 拟合模型到数据集
>>> model = fpgrowth.fit(data)
第4步:输出频繁模式集。
>>> model.freqItemsets.show()
第5步:输出生成的关联规则。
>>> model.associationRules.show()
第6步:对输入交易应用生成的关联规则,并将结果作为预测值输出。
>>> model.transform(data).show()
第1步:引入需要的包。
>>> from pyspark.ml.fpm import PrefixSpan
>>> from pyspark.sql import SparkSession
第2步:构造输入数据。
# 创建 SparkSession
>>> spark = SparkSession.builder \
    .appName("example") \
    .getOrCreate()

# 定义小型测试数据
>>> small_test_data = [
            [[2], [1], [3]],
            [[3], [1, 2]],
            [[1], [2, 3], [2, 4]],
            [[3], [5]]
]

# 转换为 DataFrame
>>> data = [(seq,) for seq in small_test_data]
>>> df = spark.createDataFrame(data, ["sequence"])
第3步:创建PrefixSpan模型,并进行参数设置。
# 创建 PrefixSpan 模型
model = PrefixSpan() \
    .setMinSupport(0.5) \
    .setMaxPatternLength(5) \
    .setMaxLocalProjDBSize(32000000)
第4步:查找频繁序列模式。
>>> result = model.findFrequentSequentialPatterns(df)
第5步:输出结果。
>>> result.show()
第1步:引入需要的包。
>>> from pyspark.ml.evaluation import RegressionEvaluator
>>> from pyspark.ml.recommendation import ALS
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql import Row
第2步:创建一个Rating类和parseRating( )函数。parseRating( )用于把读取的MovieLens数据集中的每一行转化成Rating类的对象。
# 创建Spark会话
>>> spark = SparkSession.builder.appName("example").getOrCreate()

# 定义Rating类
>>> class Rating:
            def __init__(self, userId, movieId, rating, timestamp):
                self.userId = userId
                self.movieId = movieId
                self.rating = rating
                self.timestamp = timestamp

# 定义解析函数
>>> def parseRating(line):
            fields = line.split("::")
            assert len(fields) == 4
            return Rating(int(fields[0]), int(fields[1]), float(fields[2]), int(fields[3]))

# 读取文件并转换为DataFrame
>>> ratings = spark.sparkContext
.textFile("file:///usr/local/spark/data/mllib/als/sample_movielens_ratings.txt") \
    .map(parseRating) \
    .map(lambda x: Row(userId=x.userId, movieId=x.movieId, rating=x.rating, timestamp=x.timestamp)).toDF()

# 显示DataFrame的前几行数据
>>> ratings.show()
第3步:把MovieLens数据集划分为训练集和测试集,其中训练集占80%,测试集占20%。
# 将数据集分割为训练集和测试集
>>> training, test = ratings.randomSplit([0.8, 0.2])
第4步:使用ALS来建立推荐模型。这里构建两个模型,一个是显性反馈模型,另一个是隐性反馈模型。
# 创建显式评分的ALS模型
>>> alsExplicit = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")

# 创建隐式评分的ALS模型
>>> alsImplicit = ALS(maxIter=5, regParam=0.01, implicitPrefs=True, userCol="userId", itemCol="movieId", ratingCol="rating")
第5步:把推荐模型放在训练数据上训练。
# 使用训练数据拟合显式评分的ALS模型
>>> modelExplicit = alsExplicit.fit(training)
# 使用训练数据拟合隐式评分的ALS模型
>>> modelImplicit = alsImplicit.fit(training)
第6步:对测试集中的用户-电影进行预测,得到预测评分的数据集。
# 使用显式评分的ALS模型进行预测并移除NaN值
>>> predictionsExplicit = modelExplicit.transform(test).na.drop()

# 使用隐式评分的ALS模型进行预测并移除NaN值
>>> predictionsImplicit = modelImplicit.transform(test).na.drop()
测试集中如果出现训练集中没有出现的用户,则此次算法将无法进行推荐和评分预测。因此,na.drop( )将删除modelExplicit.transform(test)返回结果的DataFrame中任何出现空值或NaN的行。
第7步:把结果输出,对比一下真实结果与预测结果。
>>> predictionsExplicit.show()
>>> predictionsImplicit.show()
第8步:通过计算模型的均方根误差来对模型进行评估。均方根误差越小,模型越准确。
# 创建回归评估器
>>> evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# 使用评估器计算显式评分模型的RMSE
>>> rmseExplicit = evaluator.evaluate(predictionsExplicit)

# 使用评估器计算隐式评分模型的RMSE
>>> rmseImplicit = evaluator.evaluate(predictionsImplicit)

>>> print("RMSE for Explicit Model: ", rmseExplicit)
RMSE for Explicit Model:  1.7802848438813463
>>> print("RMSE for Implicit Model: ", rmseImplicit)
RMSE for Implicit Model:  1.8752274536739975
第1步:导入必要的包。
>>> from pyspark.ml.feature import HashingTF, Tokenizer
>>> from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
>>> from pyspark.sql import Row, SparkSession
>>> from pyspark.ml.evaluation import MulticlassClassificationEvaluator
>>> from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
>>> from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
>>> from pyspark.ml import Pipeline, PipelineModel
>>> from pyspark.ml.feature import VectorAssembler
>>> from pyspark.sql.types import DoubleType
>>> from pyspark.sql import SparkSession
第2步:读取Iris数据集,分别获取标签列和特征列,进行索引、重命名,并设置机器学习工作流。通过交叉验证把原始数据集分割为训练集与测试集。值得注意的是,只有训练集才可以用在模型的训练过程中,测试集则作为模型完成之后用来评估模型优劣的依据。此外,训练集中的样本数量必须足够大,一般至少要大于总样本数的50%,且两个子集必须从完整集合中均匀取样。
# 创建Spark会话
>>> spark = SparkSession.builder.appName("IrisClassification").getOrCreate()

# 读取数据
>>> path = "file:///usr/local/spark/iris.data"
>>> df_raw = spark.read.option("inferSchema", "true").csv(path).toDF("c0", "c1", "c2", "c3", "label")

# 转换列数据类型为Double
>>> df_double = df_raw.select(
            df_raw["c0"].cast(DoubleType()),
            df_raw["c1"].cast(DoubleType()),
            df_raw["c2"].cast(DoubleType()),
            df_raw["c3"].cast(DoubleType()),
            df_raw["label"]
)

# 创建特征向量
>>> assembler = VectorAssembler(inputCols=["c0", "c1", "c2", "c3"], outputCol="features")
>>> data = assembler.transform(df_double).select("features", "label")

# 划分训练集和测试集
>>> trainingData, testData = data.randomSplit([0.7, 0.3])

# 对标签进行索引
>>> labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# 对特征向量进行索引
>>> featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(data)

# 创建Logistic Regression模型
>>> lr = LogisticRegression(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=50)

# 将预测结果转换为原始标签
>>> labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)

# 创建Pipeline
>>> lrPipeline = Pipeline(stages=[labelIndexer, featureIndexer, lr, labelConverter])
第3步:使用ParamGridBuilder( )方法构造参数网格。其中,regParam参数是公式(9-12)中的γ,用于定义规范化项的权重;elasticNetParam参数是α,称为Elastic Net参数,取值介于0和1之间。elasticNetParam设置2个值,regParam设置3个值,最终有3×2 = 6个不同的模型将被训练。
# 创建参数网格
>>> paramGrid = ParamGridBuilder() \
            .addGrid(lr.elasticNetParam, [0.2, 0.8]) \
            .addGrid(lr.regParam, [0.01, 0.1, 0.5]) \
            .build()
第4步:构建针对整个机器学习工作流的交叉验证类,定义验证模型、参数网格,以及数据集的折叠数,并调用fit( )方法进行模型训练。其中,对于回归问题,评估器可选择RegressionEvaluator,对于二值数据可选择BinaryClassificationEvaluator,对于多分类问题可选择MulticlassClassificationEvaluator。评估器里默认的评估准则可通过setMetricName( )方法重写。
# 创建交叉验证器
>>> cv = CrossValidator(estimator=lrPipeline,estimatorParamMaps=paramGrid,
                 evaluator=MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction"), numFolds=3)

# 在训练数据上进行交叉验证
>>> cvModel = cv.fit(trainingData)
第5步:调动transform( )方法对测试数据进行预测,并输出结果及精度。
# 使用交叉验证模型进行预测
>>> lrPredictions = cvModel.transform(testData)

# 显示前20个预测结果
>>> lrPredictions.select("predictedLabel", "label", "features", "probability").show(20)

# 遍历并输出每个预测结果
>>> for row in lrPredictions.select("predictedLabel", "label", "features", "probability").collect():
         predictedLabel = row["predictedLabel"]
         label = row["label"]
            features = row["features"]
            prob = row["probability"]
        print(f"({label}, {features}) --> prob={prob}, predicted Label={predictedLabel}")
(Iris-virginica, [7.2,3.6,6.1,2.5]) --> prob=[9.06268944830118e-05,0.011707200060683584,0.9882021730448335], predicted Label=Iris-virginica
……
# 创建多类分类评估器
>>> evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction")

# 计算模型的准确度
>>> lrAccuracy = evaluator.evaluate(lrPredictions)
第6步:获取最优的逻辑斯谛回归模型,并查看其具体的参数。
# 获取最佳模型
>>> bestModel = cvModel.bestModel

# 获取Logistic Regression模型
>>> lrModel = bestModel.stages[2]

# 输出模型的系数、截距、类别数和特征数
>>> print("Coefficients: " + str(lrModel.coefficientMatrix))
>>> print("Intercept: " + str(lrModel.interceptVector))
Intercept: [2.9478578061087863,3.9396489270310844,-6.887506733139871]
>>> print("numClasses: " + str(lrModel.numClasses))
numClasses: 3
>>> print("numFeatures: " + str(lrModel.numFeatures))
numFeatures: 4
# 解释模型的regParam参数
>>> print(lrModel.explainParam("regParam"))
regParam: regularization parameter (>= 0). (default: 0.0, current: 0.01)
# 解释模型的elasticNetParam参数
>>> print(lrModel.explainParam("elasticNetParam"))