林子雨编著《Spark编程基础(Python版)》教材第8章的命令行和代码

大数据学习路线图

林子雨、郑海山、赖永炫编著《Spark编程基础(Python版)》(教材官网)教材中的代码,在纸质教材中的印刷效果,可能会影响读者对代码的理解,为了方便读者正确理解代码或者直接拷贝代码用于上机实验,这里提供全书配套的所有代码。
查看所有章节代码

第8章 Spark MLlib

>>> from pyspark.ml.linalg import Vectors,Vector
#创建一个稠密本地向量
>>> vector = Vectors.dense(2.0,0.0,8.0)
#创建一个稀疏本地向量
#方法第二个参数是一个序列,其中每个元素都是一个非零值的元组:(index,elem)
>>> vector = Vectors.sparse(3, [(0, 2), (2.0, 8.0)])
#第二种创建稀疏本地向量的方法
#方法第二个参数是一个序列,其中每个元素都是一个非零值的元组:(index:elem)
>>> vector = Vectors.sparse(3, {0: 2.0, 2: 8.0})
#第三种创建稀疏本地向量的方法
#方法第二个参数数组指定了非零元素的索引,而第三个参数数组则给定了非零元素值
>>> vector = Vectors.sparse(3, [0, 2], [2.0, 8.0])
sudo pip3 install numpy
>>> from pyspark.mllib.linalg import Vectors #引入必要的包
>>> from pyspark.mllib.regression import LabeledPoint
#下面创建一个标签为1.0(分类中可视为正样本)的稠密向量标注点
>>> pos = LabeledPoint(1.0, Vectors.dense(2.0, 0.0, 8.0))
#创建一个标签为0.0(分类中可视为负样本)的稀疏向量标注点
>>> neg = LabeledPoint(0.0, Vectors.sparse(3, [0, 2], [2.0, 8.0]))
>>> examples=spark.read.format("libsvm"). \
... load("file:///usr/local/spark/data/mllib/sample_libsvm_data.txt")
>>> examples.head()
Row(label=0.0, features=SparseVector(692, {127: 51.0, 128: 159.0, 129: 253.0, 130: 159.0, 131: 50.0, 154: 48.0, 155: 238.0, 156: 252.0, 157: 252.0, 158: 252.0, 159: 237.0, 181: 54.0, 182: 227.0, 183: 253.0, 184: 252.0, 185: 239.0, 186: 233.0, 187: 252.0, 188: 57.0, 189: 6.0, 207: 10.0, 208: 60.0, 209: 224.0, 210: 252.0, 211: 253.0, 212: 252.0, 213: 202.0, 214: 84.0, 215: 252.0, 216: 253.0, 217: 122.0, 235: 163.0, 236: 252.0, 237: 252.0, 238: 252.0, 239: 253.0, 240: 252.0, 241: 252.0, 242: 96.0, 243: 189.0, 244: 253.0, 245: 167.0, 262: 51.0, 263: 238.0, 264: 253.0, 265: 253.0, 266: 190.0, 267: 114.0, 268: 253.0, 269: 228.0, 270: 47.0, 271: 79.0, 272: 255.0, 273: 168.0, 289: 48.0, 290: 238.0, 291: 252.0, 292: 252.0, 293: 179.0, 294: 12.0, 295: 75.0, 296: 121.0, 297: 21.0, 300: 253.0, 301: 243.0, 302: 50.0, 316: 38.0, 317: 165.0, 318: 253.0, 319: 233.0, 320: 208.0, 321: 84.0, 328: 253.0, 329: 252.0, 330: 165.0, 343: 7.0, 344: 178.0, 345: 252.0, 346: 240.0, 347: 71.0, 348: 19.0, 349: 28.0, 356: 253.0, 357: 252.0, 358: 195.0, 371: 57.0, 372: 252.0, 373: 252.0, 374: 63.0, 384: 253.0, 385: 252.0, 386: 195.0, 399: 198.0, 400: 253.0, 401: 190.0, 412: 255.0, 413: 253.0, 414: 196.0, 426: 76.0, 427: 246.0, 428: 252.0, 429: 112.0, 440: 253.0, 441: 252.0, 442: 148.0, 454: 85.0, 455: 252.0, 456: 230.0, 457: 25.0, 466: 7.0, 467: 135.0, 468: 253.0, 469: 186.0, 470: 12.0, 482: 85.0, 483: 252.0, 484: 223.0, 493: 7.0, 494: 131.0, 495: 252.0, 496: 225.0, 497: 71.0, 510: 85.0, 511: 252.0, 512: 145.0, 520: 48.0, 521: 165.0, 522: 252.0, 523: 173.0, 538: 86.0, 539: 253.0, 540: 225.0, 547: 114.0, 548: 238.0, 549: 253.0, 550: 162.0, 566: 85.0, 567: 252.0, 568: 249.0, 569: 146.0, 570: 48.0, 571: 29.0, 572: 85.0, 573: 178.0, 574: 225.0, 575: 253.0, 576: 223.0, 577: 167.0, 578: 56.0, 594: 85.0, 595: 252.0, 596: 252.0, 597: 252.0, 598: 229.0, 599: 215.0, 600: 252.0, 601: 252.0, 602: 252.0, 603: 196.0, 604: 130.0, 622: 28.0, 623: 199.0, 624: 252.0, 625: 252.0, 626: 253.0, 627: 252.0, 628: 252.0, 629: 233.0, 630: 145.0, 651: 25.0, 652: 128.0, 653: 252.0, 654: 253.0, 655: 252.0, 656: 141.0, 657: 37.0}))
>>> from pyspark.ml.linalg import Matrix,Matrices  #引入必要的包
#下面创建一个3行2列的稠密矩阵[ [1.0,2.0], [3.0,4.0], [5.0,6.0] ]
#注意,这里的数组参数是列优先的,即按照列的方式从数组中提取元素
>>> Matrix = Matrices.dense(3, 2, [1.0, 3.0, 5.0, 2.0, 4.0, 6.0])
#创建一个3行2列的稀疏矩阵[ [9.0,0.0], [0.0,8.0], [0.0,6.0]]
#第一个数组参数表示列指针,即每一列元素的开始索引值
#第二个数组参数表示行索引,即对应的元素是属于哪一行
#第三个数组即是按列优先排列的所有非零元素,通过列指针和行索引即可判断每个元素所在的位置
>>> Matrix = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])
>>> pipeline = Pipeline(stages=[stage1,stage2,stage3])
>>> from pyspark.ml.feature import HashingTF,IDF,Tokenizer
>>> sentenceData = spark.createDataFrame([(0, "I heard about Spark and I love Spark"),(0, "I wish Java could use case classes"),(1, "Logistic regression models are neat")]).toDF("label", "sentence")
>>> tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
>>> wordsData = tokenizer.transform(sentenceData)
>>> wordsData.show()
+-----+--------------------+--------------------+                               
|label|            sentence|               words|
+-----+--------------------+--------------------+
|    0|I heard about Spa...|[i, heard, about,...|
|    0|I wish Java could...|[i, wish, java, c...|
|    1|Logistic regressi...|[logistic, regres...|
+-----+--------------------+--------------------+
>>> hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=2000)
>>> featurizedData = hashingTF.transform(wordsData)
>>> featurizedData.select("words","rawFeatures").show(truncate=False)
+---------------------------------------------+---------------------------------------------------------------------+
|words                                        |rawFeatures                                                          |
+---------------------------------------------+---------------------------------------------------------------------+
|[i, heard, about, spark, and, i, love, spark]|(2000,[240,333,1105,1329,1357,1777],[1.0,1.0,2.0,2.0,1.0,1.0])       |
|[i, wish, java, could, use, case, classes]   |(2000,[213,342,489,495,1329,1809,1967],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|[logistic, regression, models, are, neat]    |(2000,[286,695,1138,1193,1604],[1.0,1.0,1.0,1.0,1.0])                |
+---------------------------------------------+---------------------------------------------------------------------+
>>> idf = IDF(inputCol="rawFeatures", outputCol="features")
>>> idfModel = idf.fit(featurizedData)
>>> rescaledData = idfModel.transform(featurizedData)
>>> rescaledData.select("features", "label").show(truncate=False)
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|features                                                                                                                                                                       |label|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|(2000,[240,333,1105,1329,1357,1777],[0.6931471805599453,0.6931471805599453,1.3862943611198906,0.5753641449035617,0.6931471805599453,0.6931471805599453])                       |0    |
|(2000,[213,342,489,495,1329,1809,1967],[0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453,0.28768207245178085,0.6931471805599453,0.6931471805599453])|0    |
|(2000,[286,695,1138,1193,1604],[0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453])                                               |1    |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
name := "Simple Project"
version := "1.0"
pythonVersion := "3.6"
libraryDependencies += "pyspark" % "spark-mllib_2.11" % "2.4.0"
>>> from pyspark.ml.feature import StringIndexer
>>> df = spark.createDataFrame([(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],["id", "category"])
>>> indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
>>> model = indexer.fit(df)
>>> indexed = model.transform(df)
>>> indexed.show()
+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
|  5|       c|          1.0|
+---+--------+-------------+
>>> from pyspark.ml.feature import IndexToString, StringIndexer
>>> toString = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
>>> indexString = toString.transform(indexed)
>>> indexString.select("id", "originalCategory").show()
+---+----------------+
| id|originalCategory|
+---+----------------+
|  0|               a|
|  1|               b|
|  2|               c|
|  3|               a|
|  4|               a|
|  5|               c|
+---+----------------+
>>> from pyspark.ml.feature import VectorIndexer
>>> from pyspark.ml.linalg import Vector, Vectors
>>> df = spark.createDataFrame([ \
... (Vectors.dense(-1.0, 1.0, 1.0),), \
... (Vectors.dense(-1.0, 3.0, 1.0),), \
... (Vectors.dense(0.0, 5.0, 1.0), )], ["features"])
>>> indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=2)
>>> indexerModel = indexer.fit(df)
>>> categoricalFeatures = indexerModel.categoryMaps.keys()
>>> print ("Choose"+str(len(categoricalFeatures))+ \
... "categorical features:"+str(categoricalFeatures))
Chose 2 categorical features: [0, 2]
>>> indexed = indexerModel.transform(df)
>>> indexed.show()
+--------------+-------------+
|      features|      indexed|
+--------------+-------------+
|[-1.0,1.0,1.0]|[1.0,1.0,0.0]|
|[-1.0,3.0,1.0]|[1.0,3.0,0.0]|
| [0.0,5.0,1.0]|[0.0,5.0,0.0]|
+--------------+-------------+
>>> from pyspark.ml.feature import ChiSqSelector, ChiSqSelectorModel
>>> from pyspark.ml.linalg import Vectors
>>> df = spark.createDataFrame( [ \
... (1, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1), \
... (2, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0), \
... (3, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0) \
...  ], ["id", "features", "label"])
>>> df.show()
+---+------------------+-----+
| id|          features|label|
+---+------------------+-----+
|  1|[0.0,0.0,18.0,1.0]|    1|
|  2|[0.0,1.0,12.0,0.0]|    0|
|  3|[1.0,0.0,15.0,0.1]|    0|
+---+------------------+-----+
>>> selector = ChiSqSelector( \
... numTopFeatures = 1, \
... featuresCol = "features", \
... labelCol = "label", \
... outputCol = "selected-feature")
>>> selector_model = selector.fit(df)
>>> result = selector_model.transform(df)
>>> result.show()
+---+------------------+-----+----------------+
|id |features          |label|selected-feature|
+---+------------------+-----+----------------+
|1  |[0.0,0.0,18.0,1.0]|1.0  |[18.0]          |
|2  |[0.0,1.0,12.0,0.0]|0.0  |[12.0]          |
|3  |[1.0,0.0,15.0,0.1]|0.0  |[15.0]          |
+---+------------------+-----+----------------+
>>> from pyspark.ml.linalg import Vector,Vectors
>>> from pyspark.sql import Row,functions
>>> from pyspark.ml.evaluation import MulticlassClassificationEvaluator
>>> from pyspark.ml import Pipeline
>>> from pyspark.ml.feature import IndexToString, StringIndexer, \
... VectorIndexer,HashingTF, Tokenizer
>>> from pyspark.ml.classification import LogisticRegression, \
... LogisticRegressionModel,BinaryLogisticRegressionSummary, LogisticRegression
>>> def f(x):
...     rel = {}
...     rel['features']=Vectors. \
...     dense(float(x[0]),float(x[1]),float(x[2]),float(x[3]))
...     rel['label'] = str(x[4])
...     return rel
>>> data = spark.sparkContext. \
... textFile("file:///usr/local/spark/iris.txt"). \
... map(lambda line: line.split(',')). \
... map(lambda p: Row(**f(p))). \
... toDF()
>>> data.show()
+-----------------+-----------+
|         features|      label|
+-----------------+-----------+
|[5.1,3.5,1.4,0.2]|Iris-setosa|
|[4.9,3.0,1.4,0.2]|Iris-setosa|
|[4.7,3.2,1.3,0.2]|Iris-setosa|
|[4.6,3.1,1.5,0.2]|Iris-setosa|
|[5.0,3.6,1.4,0.2]|Iris-setosa|
|[5.4,3.9,1.7,0.4]|Iris-setosa|
|[4.6,3.4,1.4,0.3]|Iris-setosa|
|[5.0,3.4,1.5,0.2]|Iris-setosa|
|[4.4,2.9,1.4,0.2]|Iris-setosa|
|[4.9,3.1,1.5,0.1]|Iris-setosa|
|[5.4,3.7,1.5,0.2]|Iris-setosa|
|[4.8,3.4,1.6,0.2]|Iris-setosa|
|[4.8,3.0,1.4,0.1]|Iris-setosa|
|[4.3,3.0,1.1,0.1]|Iris-setosa|
|[5.8,4.0,1.2,0.2]|Iris-setosa|
|[5.7,4.4,1.5,0.4]|Iris-setosa|
|[5.4,3.9,1.3,0.4]|Iris-setosa|
|[5.1,3.5,1.4,0.3]|Iris-setosa|
|[5.7,3.8,1.7,0.3]|Iris-setosa|
|[5.1,3.8,1.5,0.3]|Iris-setosa|
+-----------------+-----------+
only showing top 20 rows
>>> labelIndexer = StringIndexer(). \
... setInputCol("label"). \
... setOutputCol("indexedLabel"). \
... fit(data)
>>> featureIndexer = VectorIndexer(). \
... setInputCol("features"). \
... setOutputCol("indexedFeatures"). \
... fit(data)
>>> lr = LogisticRegression(). \
... setLabelCol("indexedLabel"). \
... setFeaturesCol("indexedFeatures"). \
... setMaxIter(100). \
... setRegParam(0.3). \
... setElasticNetParam(0.8)
>>> print("LogisticRegression parameters:\n" + lr.explainParams())

LogisticRegression parameters:
aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0, current: 0.8)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features, current: indexedFeatures)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: indexedLabel)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. The bounds vector size must beequal with 1 for binomial regression, or the number oflasses for multinomial regression. (undefined)
maxIter: max number of iterations (>= 0). (default: 100, current: 100)
predictionCol: prediction column name. (default: prediction)
probabilityCol: Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities. (default: probability)
rawPredictionCol: raw prediction (a.k.a. confidence) column name. (default: rawPrediction)
regParam: regularization parameter (>= 0). (default: 0.0, current: 0.3)
standardization: whether to standardize the training features before fitting the model. (default: True)
threshold: Threshold in binary classification prediction, in range [0, 1]. If threshold and thresholds are both set, they must match.e.g. if threshold is p, then thresholds must be equal to [1-p, p]. (default: 0.5)
thresholds: Thresholds in multi-class classification to adjust the probability of predicting each class. Array must have length equal to the number of classes, with values > 0, excepting that at most one value may be 0. The class with largest value p/t is predicted, where p is the original probability of that class and t is the class's threshold. (undefined)
tol: the convergence tolerance for iterative algorithms (>= 0). (default: 1e-06)
upperBoundsOnCoefficients: The upper bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
upperBoundsOnIntercepts: The upper bounds on intercepts if fitting under bound constrained optimization. The bound vector size must be equal with 1 for binomial regression, or the number of classes for multinomial regression. (undefined)
weightCol: weight column name. If this is not set or empty, we treat all instance weights as 1.0. (undefined)
>>> labelConverter = IndexToString(). \
... setInputCol("prediction"). \
... setOutputCol("predictedLabel"). \
... setLabels(labelIndexer.labels)
>>> lrPipeline = Pipeline(). \
... setStages([labelIndexer, featureIndexer, lr, labelConverter])
>>> trainingData, testData = data.randomSplit([0.7, 0.3])
>>> lrPipelineModel = lrPipeline.fit(trainingData)
>>> lrPredictions = lrPipelineModel.transform(testData)
>>> preRel = lrPredictions.select( \
... "predictedLabel", \
... "label", \
... "features", \
... "probability"). \
... collect()
>>> for item in preRel:
...     print(str(item['label'])+','+ \
...     str(item['features'])+'-->prob='+ \
...     str(item['probability'])+',predictedLabel'+ \
...     str(item['predictedLabel']))

......
(Iris-setosa, [4.4,2.9,1.4,0.2]) --> prob=[0.2624350739662679,0.19160655295857498,0.5459583730751572], predicted Label=Iris-setosa
(Iris-setosa, [4.6,3.6,1.0,0.2]) --> prob=[0.2473632180957361,0.18060243561101244,0.5720343462932513], predicted Label=Iris-setosa
(Iris-setosa, [4.8,3.0,1.4,0.1]) --> prob=[0.2597529706621392,0.18432454082078972,0.5559224885170712], predicted Label=Iris-setosa
(Iris-versicolor, [4.9,2.4,3.3,1.0]) --> prob=[0.3424659128771462,0.31400211670249883,0.34353197042035494], predicted Label=Iris-setosa
(Iris-setosa, [4.9,3.0,1.4,0.2]) --> prob=[0.2624350739662679,0.19160655295857498,0.5459583730751572], predicted Label=Iris-setosa
(Iris-setosa, [4.9,3.1,1.5,0.1]) --> prob=[0.2635779329910567,0.18703879052941608,0.5493832764795272], predicted Label=Iris-setosa
(Iris-versicolor, [5.0,2.0,3.5,1.0]) --> prob=[0.34863710327362973,0.31966039326144235,0.3317025034649279], predicted Label=Iris-versicolor
(Iris-setosa, [5.0,3.2,1.2,0.2]) --> prob=[0.2548756158571469,0.18608731466228842,0.5590370694805646], predicted Label=Iris-setosa
(Iris-setosa, [5.0,3.4,1.6,0.4]) --> prob=[0.2749323011344446,0.21249363890622694,0.5125740599593285], predicted Label=Iris-setosa
(Iris-setosa, [5.0,3.5,1.6,0.6]) --> prob=[0.27934905017805023,0.22855938133781042,0.4920915684841392], predicted Label=Iris-setosa
>>> evaluator = MulticlassClassificationEvaluator(). \
... setLabelCol("indexedLabel"). \
... setPredictionCol("prediction")
>>> lrAccuracy = evaluator.evaluate(lrPredictions)
>>> lrAccuracy
0.7774712643678161 #模型预测的准确率
>>> lrModel = lrPipelineModel.stages[2]
>>> print ("Coefficients: \n " + str(lrModel.coefficientMatrix)+ \
... "\nIntercept: "+str(lrModel.interceptVector)+ \
... "\n numClasses: "+str(lrModel.numClasses)+ \
... "\n numFeatures: "+str(lrModel.numFeatures))

Coefficients: 
 3 X 4 CSRMatrix
(1,3) 0.4332
(2,2) -0.2472
(2,3) -0.1689
Intercept: [-0.11530503231364186,-0.63496556499483,0.750270597308472]
 numClasses: 3
 numFeatures: 4
>>> from pyspark.ml.classification import DecisionTreeClassificationModel
>>> from pyspark.ml.classification import DecisionTreeClassifier
>>> from pyspark.ml import Pipeline,PipelineModel
>>> from pyspark.ml.evaluation import MulticlassClassificationEvaluator
>>> from pyspark.ml.linalg import Vector,Vectors
>>> from pyspark.sql import Row
>>> from pyspark.ml.feature import IndexToString,StringIndexer,VectorIndexer
>>> def f(x):
...     rel = {}
...     rel['features']=Vectors. \
...     dense(float(x[0]),float(x[1]),float(x[2]),float(x[3]))
...     rel['label'] = str(x[4])
...     return rel
>>> data = spark.sparkContext. \
... textFile("file:///usr/local/spark/iris.txt"). \
... map(lambda line: line.split(',')). \
... map(lambda p: Row(**f(p))). \
... toDF()
>>> labelIndexer = StringIndexer(). \
... setInputCol("label"). \
... setOutputCol("indexedLabel"). \
... fit(data)
>>> featureIndexer = VectorIndexer(). \
... setInputCol("features"). \
... setOutputCol("indexedFeatures"). \
... setMaxCategories(4). \
... fit(data)
>>> labelConverter = IndexToString(). \
... setInputCol("prediction"). \
... setOutputCol("predictedLabel"). \
... setLabels(labelIndexer.labels)
>>> trainingData, testData = data.randomSplit([0.7, 0.3])
>>> dtClassifier = DecisionTreeClassifier(). \
... setLabelCol("indexedLabel"). \
... setFeaturesCol("indexedFeatures")
>>> dtPipeline = Pipeline(). \
... setStages([labelIndexer, featureIndexer, dtClassifier, labelConverter])
>>> dtPipelineModel = dtPipeline.fit(trainingData)
>>> dtPredictions = dtPipelineModel.transform(testData)
>>> dtPredictions.select("predictedLabel", "label", "features").show(20)
+---------------+---------------+-----------------+
| predictedLabel|          label|         features|
+---------------+---------------+-----------------+
|    Iris-setosa|    Iris-setosa|[4.4,3.0,1.3,0.2]|
|    Iris-setosa|    Iris-setosa|[4.6,3.4,1.4,0.3]|
|    Iris-setosa|    Iris-setosa|[4.9,3.1,1.5,0.1]|
|    Iris-setosa|    Iris-setosa|[5.0,3.2,1.2,0.2]|
|    Iris-setosa|    Iris-setosa|[5.0,3.5,1.3,0.3]|
|    Iris-setosa|    Iris-setosa|[5.1,3.4,1.5,0.2]|
|    Iris-setosa|    Iris-setosa|[5.1,3.5,1.4,0.3]|
|    Iris-setosa|    Iris-setosa|[5.1,3.8,1.5,0.3]|
|Iris-versicolor|Iris-versicolor|[5.2,2.7,3.9,1.4]|
|    Iris-setosa|    Iris-setosa|[5.2,3.5,1.5,0.2]|
|    Iris-setosa|    Iris-setosa|[5.4,3.4,1.7,0.2]|
|    Iris-setosa|    Iris-setosa|[5.4,3.9,1.7,0.4]|
|Iris-versicolor|Iris-versicolor|[5.7,2.8,4.5,1.3]|
|    Iris-setosa|    Iris-setosa|[5.7,3.8,1.7,0.3]|
|Iris-versicolor|Iris-versicolor|[5.9,3.0,4.2,1.5]|
|Iris-versicolor|Iris-versicolor|[6.1,2.8,4.0,1.3]|
|Iris-versicolor|Iris-versicolor|[6.1,2.8,4.7,1.2]|
|Iris-versicolor|Iris-versicolor|[6.1,2.9,4.7,1.4]|
|Iris-versicolor|Iris-versicolor|[6.4,2.9,4.3,1.3]|
|Iris-versicolor|Iris-versicolor|[6.7,3.1,4.4,1.4]|
+---------------+---------------+-----------------+
>>> evaluator = MulticlassClassificationEvaluator(). \
... setLabelCol("indexedLabel"). \
... setPredictionCol("prediction")
>>> dtAccuracy = evaluator.evaluate(dtPredictions)
>>> dtAccuracy
0.9726976552103888  #模型的预测准确率
>>> treeModelClassifier = dtPipelineModel.stages[2]
>>> print("Learned classification tree model:\n" + \
... str(treeModelClassifier.toDebugString))

Learned classification tree model:
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_5427198bb4c1) of depth 5 with 15 nodes
  If (feature 2 <= 2.45)
   Predict: 2.0
  Else (feature 2 > 2.45)
   If (feature 2 <= 4.75)
    Predict: 0.0
   Else (feature 2 > 4.75)
    If (feature 3 <= 1.75)
     If (feature 2 <= 4.95)
      Predict: 0.0
     Else (feature 2 > 4.95)
      If (feature 3 <= 1.55)
       Predict: 1.0
      Else (feature 3 > 1.55)
       Predict: 0.0
    Else (feature 3 > 1.75)
     If (feature 2 <= 4.85)
      If (feature 0 <= 5.95)
       Predict: 0.0
      Else (feature 0 > 5.95)
       Predict: 1.0
     Else (feature 2 > 4.85)
      Predict: 1.0
>>> from pyspark.sql import Row
>>> from pyspark.ml.clustering import KMeans,KMeansModel
>>> from pyspark.ml.linalg import Vectors
>>> def f(x):
...     rel = {}
...     rel['features'] = Vectors. \
...     dense(float(x[0]),float(x[1]),float(x[2]),float(x[3]))
...     rel['label'] = str(x[4])
...     return rel
>>> data = spark. \
... sparkContext.textFile("file:///usr/local/spark/iris.txt"). \
... map(lambda line: line.split(',')). \
... map(lambda p: Row(**f(p))). \
... toDF()
>>> kmeansmodel = KMeans(). \
... setK(3). \
... setFeaturesCol('features'). \
... setPredictionCol('prediction'). \
... fit(data)
>>> results = kmeansmodel.transform(data).collect()
>>> for item in results:
...     print(str(item[0])+' is predcted as cluster'+ str(item[1]))

[5.1,3.5,1.4,0.2] is predcted as cluster1
[4.9,3.0,1.4,0.2] is predcted as cluster1
[4.7,3.2,1.3,0.2] is predcted as cluster1
[4.6,3.1,1.5,0.2] is predcted as cluster1
[5.0,3.6,1.4,0.2] is predcted as cluster1
[5.4,3.9,1.7,0.4] is predcted as cluster1
[4.6,3.4,1.4,0.3] is predcted as cluster1
[5.0,3.4,1.5,0.2] is predcted as cluster1
[4.4,2.9,1.4,0.2] is predcted as cluster1
[4.9,3.1,1.5,0.1] is predcted as cluster1
[5.4,3.7,1.5,0.2] is predcted as cluster1
[4.8,3.4,1.6,0.2] is predcted as cluster1
[4.8,3.0,1.4,0.1] is predcted as cluster1
[4.3,3.0,1.1,0.1] is predcted as cluster1
[5.8,4.0,1.2,0.2] is predcted as cluster1
… …
>>> results2 = kmeansmodel.clusterCenters()
>>> for item in results2:
...     print(item)

[ 5.9016129   2.7483871   4.39354839  1.43387097]
[ 5.006  3.418  1.464  0.244]
[ 6.85        3.07368421  5.74210526  2.07105263]
>>> kmeansmodel.computeCost(data)
78.94084142614622
>>> from pyspark.sql import Row
>>> from pyspark.ml.clustering import GaussianMixture, GaussianMixtureModel
>>> from pyspark.ml.linalg import Vectors
>>> def f(x):
...     rel = {}
...     rel['features'] = Vectors. \
...     dense(float(x[0]),float(x[1]),float(x[2]),float(x[3]))
...     rel['label'] = str(x[4])
...     return rel
>>> data = spark.sparkContext.textFile("file:///usr/local/spark/iris.txt"). \
... map(lambda line: line.split(',')). \
... map(lambda p: Row(**f(p))). \
... toDF()
>>> gm = GaussianMixture(). \
... setK(3). \
... setPredictionCol("Prediction"). \
... setProbabilityCol("Probability")
>>> gmm = gm.fit(data)
>>> result = gmm.transform(data)
>>> result.show(150, False)                                                                        

+-----------------+----------+------------------------------------------------------------------+
|features         |Prediction|Probability                                                       |
+-----------------+----------+------------------------------------------------------------------+
|[5.1,3.5,1.4,0.2]|2         |[7.6734208374482E-12,3.834611512428377E-17,0.9999999999923265]    |
|[4.9,3.0,1.4,0.2]|2         |[3.166798629239744E-8,7.627279694305193E-17,0.9999999683320137]   |
|[4.7,3.2,1.3,0.2]|2         |[2.4270047914398485E-9,6.829198934603355E-17,0.9999999975729952]  |
|[4.6,3.1,1.5,0.2]|2         |[6.358110927310705E-8,6.742515135411673E-17,0.9999999364188907]   |
|[5.0,3.6,1.4,0.2]|2         |[2.76196999866254E-12,5.209131973934126E-17,0.9999999999972379]   |
|[5.4,3.9,1.7,0.4]|2         |[4.423526830721967E-13,3.661673865920354E-16,0.9999999999995574]  |
|[4.6,3.4,1.4,0.3]|2         |[1.996253995900862E-9,2.8082283131963835E-16,0.9999999980037457]  |
|[5.0,3.4,1.5,0.2]|2         |[1.5177480903927398E-10,3.3953493957830917E-17,0.9999999998482252]|
|[4.4,2.9,1.4,0.2]|2         |[8.259607931046402E-7,1.383366216513306E-16,0.9999991740392067]   |
>>> for i in range(3):
...     print("Component "+str(i)+ \
...    ": weight is "+str(gmm.weights[i])+ \
...    "\n mu vector is "+str( gmm.gaussiansDF.select('mean').head())+ \
...    "\n sigma matrix is "+ str(gmm.gaussiansDF.select('cov').head()))

......
Component 0 : weight is 0.6537361109963744
 mu vector is Row(mean=DenseVector([6.2591, 2.8764, 4.9057, 1.6784])) 
 sigma matrix is Row(cov=DenseMatrix(4, 4, [0.4414, 0.1249, 0.4524, 0.1676, 0.1249, 0.1103, 0.1469, 0.081, 0.4524, 0.1469, 0.6722, 0.2871, 0.1676, 0.081, 0.2871, 0.1806], False))
Component 1 : weight is 0.03291269344724756
 mu vector is Row(mean=DenseVector([6.2591, 2.8764, 4.9057, 1.6784])) 
 sigma matrix is Row(cov=DenseMatrix(4, 4, [0.4414, 0.1249, 0.4524, 0.1676, 0.1249, 0.1103, 0.1469, 0.081, 0.4524, 0.1469, 0.6722, 0.2871, 0.1676, 0.081, 0.2871, 0.1806], False))
Component 2 : weight is 0.31335119555637797
 mu vector is Row(mean=DenseVector([6.2591, 2.8764, 4.9057, 1.6784])) 
 sigma matrix is Row(cov=DenseMatrix(4, 4, [0.4414, 0.1249, 0.4524, 0.1676, 0.1249, 0.1103, 0.1469, 0.081, 0.4524, 0.1469, 0.6722, 0.2871, 0.1676, 0.081, 0.2871, 0.1806], False))
>>> from pyspark.ml.evaluation import RegressionEvaluator
>>> from pyspark.ml.recommendation import ALS
>>> from pyspark.sql import Row
>>> def f(x):
...     rel = {}
...     rel['userId'] = int(x[0])
...     rel['movieId'] = int(x[1])
...     rel['rating'] = float(x[2])
...     rel['timestamp'] = float(x[3])
...     return rel
>>> ratings = spark.sparkContext. \
...textFile("file:///usr/local/spark/data/mllib/als/sample_movielens_ratings.txt"). \
... map(lambda line: line.split('::')). \
... map(lambda p: Row(**f(p))). \
... toDF()
>>> ratings.show()
+-------+------+-------------+------+
|movieId|rating|    timestamp|userId|
+-------+------+-------------+------+
|      2|   3.0|1.424380312E9|     0|
|      3|   1.0|1.424380312E9|     0|
|      5|   2.0|1.424380312E9|     0|
|      9|   4.0|1.424380312E9|     0|
|     11|   1.0|1.424380312E9|     0|
|     12|   2.0|1.424380312E9|     0|
|     15|   1.0|1.424380312E9|     0|
|     17|   1.0|1.424380312E9|     0|
|     19|   1.0|1.424380312E9|     0|
|     21|   1.0|1.424380312E9|     0|
|     23|   1.0|1.424380312E9|     0|
|     26|   3.0|1.424380312E9|     0|
|     27|   1.0|1.424380312E9|     0|
|     28|   1.0|1.424380312E9|     0|
|     29|   1.0|1.424380312E9|     0|
|     30|   1.0|1.424380312E9|     0|
|     31|   1.0|1.424380312E9|     0|
|     34|   1.0|1.424380312E9|     0|
|     37|   1.0|1.424380312E9|     0|
|     41|   2.0|1.424380312E9|     0|
+-------+------+-------------+------+
only showing top 20 rows
>>> training, test = ratings.randomSplit([0.8,0.2])
>>> alsExplicit  = ALS( \
... maxIter=5, \
... regParam=0.01, \
... userCol="userId", \
... itemCol="movieId", \
... ratingCol="rating")
>>> alsImplicit = ALS( \
... maxIter=5, \
... regParam=0.01, \
... implicitPrefs=True, \
... userCol="userId", \
... itemCol="movieId", \
... ratingCol="rating")
>>> modelExplicit = alsExplicit.fit(training)
>>> modelImplicit = alsImplicit.fit(training)
>>> predictionsExplicit = modelExplicit.transform(test)
>>> predictionsImplicit = modelImplicit.transform(test)
>>> predictionsExplicit.show()
+-------+------+-------------+------+-----------+                               
|movieId|rating|    timestamp|userId| prediction|
+-------+------+-------------+------+-----------+
|     31|   4.0|1.424380312E9|    12|  1.6642745|
|     31|   1.0|1.424380312E9|    13|  2.4953516|
|     31|   1.0|1.424380312E9|    24|   1.065728|
|     31|   1.0|1.424380312E9|     0| 0.27203378|
|     85|   1.0|1.424380312E9|    28| -4.3580985|
|     85|   1.0|1.424380312E9|    12| 0.40682888|
|     85|   1.0|1.424380312E9|    13|   3.660988|
|     85|   2.0|1.424380312E9|    20|  2.8040016|
|     85|   1.0|1.424380312E9|     5|  1.5130293|
|     85|   5.0|1.424380312E9|     8|  4.2697716|
|     85|   1.0|1.424380312E9|    29|-0.08079797|
|     65|   5.0|1.424380312E9|    23|   2.085921|
|     65|   1.0|1.424380312E9|    24| 0.24009112|
|     53|   1.0|1.424380312E9|    23| -1.0775211|
|     53|   1.0|1.424380312E9|    25|  0.9550235|
|     78|   1.0|1.424380312E9|    28|-0.26401743|
|     78|   1.0|1.424380312E9|    22| 0.60926896|
|     78|   1.0|1.424380312E9|    20| 0.12379208|
|     34|   1.0|1.424380312E9|    14| -0.2330051|
|     81|   3.0|1.424380312E9|    26|   5.684144|
+-------+------+-------------+------+-----------+
only showing top 20 rows

>>> predictionsImplicit.show()
+-------+------+-------------+------+-----------+                               
|movieId|rating|    timestamp|userId| prediction|
+-------+------+-------------+------+-----------+
|     31|   4.0|1.424380312E9|    12|0.012495369|
|     31|   1.0|1.424380312E9|    13| 0.29184195|
|     31|   1.0|1.424380312E9|    24|0.030779492|
|     31|   1.0|1.424380312E9|     0|-0.11765161|
|     85|   1.0|1.424380312E9|    28|  0.7986825|
|     85|   1.0|1.424380312E9|    12| 0.43071586|
|     85|   1.0|1.424380312E9|    13|-0.43334037|
|     85|   2.0|1.424380312E9|    20| 0.41960382|
|     85|   1.0|1.424380312E9|     5|-0.18235317|
|     85|   5.0|1.424380312E9|     8| 0.15960173|
|     85|   1.0|1.424380312E9|    29|0.087186486|
|     65|   5.0|1.424380312E9|    23| 0.93609256|
|     65|   1.0|1.424380312E9|    24| 0.21193008|
|     53|   1.0|1.424380312E9|    23| 0.30090192|
|     53|   1.0|1.424380312E9|    25| 0.07434524|
|     78|   1.0|1.424380312E9|    28| -0.3169421|
|     78|   1.0|1.424380312E9|    22|  0.4639935|
|     78|   1.0|1.424380312E9|    20|  0.5240105|
|     34|   1.0|1.424380312E9|    14|0.116244696|
|     81|   3.0|1.424380312E9|    26|  0.3299607|
+-------+------+-------------+------+-----------+
only showing top 20 rows
>>> evaluator = RegressionEvaluator(). \
... setMetricName("rmse"). \
... setLabelCol("rating"). \
... setPredictionCol("prediction")
>>> rmseExplicit=evaluator.evaluate(predictionsExplicit)
>>> rmseImplicit=evaluator.evaluate(predictionsImplicit)
>>> print("Explicit:Root-mean-square error = "+str(rmseExplicit))
Explicit:Root-mean-square error = 1.9093067340213505
>>> print("Explicit:Root-mean-square error = "+str(rmseImplicit))
Implicit:Root-mean-square error = 1.9965886603519194
>>> from pyspark.ml.linalg import Vector,Vectors
>>> from pyspark.ml.feature import HashingTF,Tokenizer
>>> from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
>>> from pyspark.sql import Row
>>> from pyspark.ml.evaluation import MulticlassClassificationEvaluator
>>> from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
>>> from pyspark.ml.classification import LogisticRegression, \
... LogisticRegressionModel
>>> from pyspark.ml import Pipeline, PipelineModel
>>> def f(x):
...     rel = {}
...     rel['features'] = Vectors. \
...     dense(float(x[0]),float(x[1]),float(x[2]),float(x[3]))
...     rel['label'] = str(x[4])
...     return rel
>>> data = spark.sparkContext. \
... textFile("file:///usr/local/spark/iris.txt"). \
... map(lambda line: line.split(',')). \
... map(lambda p: Row(**f(p))). \
... toDF()
>>> trainingData, testData = data.randomSplit([0.7,0.3])
>>> labelIndexer = StringIndexer(). \
... setInputCol("label"). \
... setOutputCol("indexedLabel"). \
... fit(data)
>>> featureIndexer = VectorIndexer(). \
... setInputCol("features"). \
... setOutputCol("indexedFeatures"). \
... fit(data)
>>> lr = LogisticRegression(). \
... setLabelCol("indexedLabel"). \
... setFeaturesCol("indexedFeatures"). \
... setMaxIter(50)
>>> labelConverter =  IndexToString(). \
... setInputCol("prediction"). \
... setOutputCol("predictedLabel"). \
... setLabels(labelIndexer.labels)
>>> lrPipeline = Pipeline(). \
... setStages([labelIndexer, featureIndexer, lr, labelConverter])
>>> paramGrid = ParamGridBuilder(). \
... addGrid(lr.elasticNetParam, [0.2,0.8]). \
... addGrid(lr.regParam, [0.01, 0.1, 0.5]). \
... build()
>>> cv = CrossValidator(). \
... setEstimator(lrPipeline). \
... setEvaluator(MulticlassClassificationEvaluator(). \
... setLabelCol("indexedLabel"). \
... setPredictionCol("prediction")). \
... setEstimatorParamMaps(paramGrid). \
... setNumFolds(3) 
>>> cvModel = cv.fit(trainingData)
>>> lrPredictions=cvModel.transform(testData)
>>> lrPreRel = lrPredictions.select( \
... "predictedLabel", \
... "label", \
... "features", \
... "probability"). \
... collect()
>>> for item in lrPreRel:
...     print(str(item['label'])+','+ \
...     str(item['features'])+'-->prob='+ \
...     str(item['probability'])+',predictedLabel'+ \
...     str(item['predictedLabel']))

Iris-setosa,[4.7,3.2,1.6,0.2]-->prob=[0.0124531315584,3.61495729015e-06,0.987543253484],predictedLabelIris-setosa
Iris-setosa,[4.8,3.0,1.4,0.1]-->prob=[0.0189306218612,4.68811238788e-06,0.981064690026],predictedLabelIris-setosa
Iris-setosa,[4.8,3.4,1.9,0.2]-->prob=[0.00954865220928,2.15004457391e-06,0.990449197746],predictedLabelIris-setosa
Iris-setosa,[4.9,3.1,1.5,0.1]-->prob=[0.0164788072778,3.34188478227e-06,0.983517850837],predictedLabelIris-setosa
Iris-versicolor,[5.0,2.0,3.5,1.0]-->prob=[0.699989993024,0.295573613818,0.00443639315824],predictedLabelIris-versicolor
Iris-setosa,[5.0,3.0,1.6,0.2]-->prob=[0.0385031367429,1.48064614591e-05,0.961482056796],predictedLabelIris-setosa
Iris-setosa,[5.0,3.4,1.6,0.4]-->prob=[0.0125473025277,4.16328341655e-06,0.987448534189],predictedLabelIris-setosa
......  
>>> evaluator = MulticlassClassificationEvaluator(). \
... setLabelCol("indexedLabel"). \
... setPredictionCol("prediction")
>>> lrAccuracy = evaluator.evaluate(lrPredictions)
>>> bestModel= cvModel.bestModel
>>> lrModel = bestModel.stages[2]
>>> print("Coefficients: " + str(lrModel.coefficientMatrix) + \
... "Intercept: "+str(lrModel.interceptVector)+ \
... "numClasses: "+str(lrModel.numClasses)+ \
... "numFeatures: "+str(lrModel.numFeatures))
 Coefficients: DenseMatrix([[ 0.57547193, -0.3505967 ,  0.09896991, -0.01073066],
             [-0.00950188, -2.63413762,  0.96959059,  3.81872308],
             [-0.63212011,  3.61557516, -1.24757936, -2.20366198]])Intercept: [0.0988591576682,-0.14796420754,0.0491050498716]numClasses: 3numFeatures: 4
>>> lr.explainParam(lr.regParam)
'regParam: regularization parameter (>= 0). (default: 0.0)'
>>> lr.explainParam(lr.elasticNetParam)
 'elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)'