MLlib的设计理念非常简单:把数据以RDD的形式表示,然后在分布式数据集上调用各种算法。
MLlib引入了一些数据类型(比如点和向量),不过归根结底,MLlib就是RDD上一系列可供调用的函数集合。比如要用MLlib来完成文本分类任务(例如识别垃圾邮件),需要如下步骤:
(1)首先用字符串RDD来表示你的消息。
(2)运行MLlib中的一个特征提取(feature extraction)算法来把文本数据转换为数值特征(适合机器学习算法处理);该操作会返回一个向量RDD。
(3)对向量RDD调用分类算法(比如逻辑回归);这步会返回一个模型对象,可以使用该对象对新的数据进行分类。
(4)使用MLlib的评估函数在测试数据集上评估模型。
**MLlib中只包含能够在集群上运行良好的算法。
例:垃圾邮件分类
python版本
#coding = utf-8

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.classification import LogisticRegressionWithSGD

spam = sc.textFile("spam.txt")
normal = sc.textFile("normal.txt")

#创建一个HashingTF实例来吧邮件文本映射为包含10000个特征的向量
tf = HashingTF(numFeatures = 10000)
#各邮件都被切分为单词,每个单词被映射为一个特征
spamFeatures = spam.map(lambda email: tf.transfrom(eamil.split(" ")))
normalFeatures = normal.map(lambda: email: tf.transfrom(email.split(" ")))

#创建:LablePoint数据集分别存放阳性(垃圾邮件)和阴性(正常邮件)的例子
positiveExamples = spamFeatures.map(lambda features: LabeledPoint(1, features))
negativeEsamples = normalFeatures.map(lambda features:LabeledPoint(0, features))
trainingData = positiveExamples.union(negativeEsamples)
trainingData.cache()#应为逻辑回归是迭代算法,所以缓存训练数据RDD

#使用SGD算法运行逻辑回归
model = LogistcRegressionWithSDG.tran(trainingData)

#以阳性(垃圾邮件)和阴性(正常邮件)的例子分别进行测试。首先利用一样的HashingTF特征值来的到特征向量,然后对该向量应用得到的模型
posTest = tf.transfrom("O M G GET cheap stuff by sending money to ...".split(" "))
negTest = tf.transfrom("Hi Dad, I started studying Spark the other ...".split(" "))
print "Prediction for positive test example: %g"% model.predict(posTest)
print "Prediction for negative text example: %g"% model.predict(negTest)
~                                                                             

数据类型


MLlib包含一些特有的数据类型,它有位于org.apache.spark.mllib包(Java/Scala)或者pyspark.mllib(Python)内。主要有如下所列:
Vector
一个数据向量。MLlib既支持稠密向量也支持稀疏向量。
LabeledPoint
用来表示带有标签的数据点。它包含一个特征向量和一个标签(由一个浮点数表示),位于mllib.regression包中。
Rating
用户对一个产品的评分,在mllib.recommendation包中,用于产品推荐。
各种Model类
每个Model都是训练算法的结果,一般有一个predict()方法可以用来对新的数据点或数据点组成的RDD应用该模型进行预测。

操作向量

向量有两种:稀疏向量和稠密向量。
在不同语言中创建向量的方式可能不同。
用Python创建向量:
from numpy import array
from pyspark.mllib.linalg import Vectors

#创建稠密向量<1.0,2.0,3.0>
denseVec1 = array([1.0, 2.0, 3.0])#NumPy数组可以直接传递给Mllib
denseVec2 = Vectors.dense([1.0, 2.0, 3.0])#或者使用B+Vectors类来创建

#创建稀疏向量<1.0, 0.0, 2.0, 0.0>;该方法只接收向量的维度(4)以及非零的位置对应的值
#这些数据可以用一个dictionary来传递,或使用两个分别代表位置和值的list
sparseVec1 = Vectors.sparse(4,{0: 1.0, 2:2.0})
sparseVec2 = Vectors.sparse(4,[0,2],[1.0, 2.0])