机器学习笔记(Chapter 15 - MapReduce框架)

MapReduce是一个分布式计算框架,可以将单个计算作业分配给多台计算机执行。

MapReduce工作流程

  • MapReduce框架的优点是可以短时间内完成大量工作,缺点是算法必须经过重写,需要对系统工程有一定理解。适用于数值型和标称型数据。
  • MapReduce工作流程是:单个作业被分成很多小份,输入数据被切片分发到每个节点,各个节点只在本地数据上做运算,对应的运算代码称为mapper,该过程称为map阶段。每个mapper的输出通过某种方式组合(一般还会做排序),排序后的结果再被分成小份分发给各个节点进行下一步处理。第二步处理阶段称为reduce,对应运行代码称为reducer。reducer的输出为程序最终执行结果。
  • 在任何时候,每个mapper或reducer之间都不进行通信。每个节点值处理自己的事务,且在本地分配的数据集上计算。
  • 主节点控制MapReduce的作业流程,数据被重复存放在不同的机器上防止某个机器失效。mapper和reducer传输的数据形式为key/value对。

MapReduce上的机器学习

  • 简单贝叶斯:直接使用reducer将各个mapper的结果相加
  • k-近邻算法:构建树存储数据,利用树形结构缩小搜索范围,该方法在特征数小于10的情况下效果很好。高维数据下(文本、图像、视频)的近邻查找方法是局部敏感哈希算法。
  • 支持向量机:SMO算法构造的SVM无法在MapReduce框架实现,但Pegasos算法构造的SVM和“最邻近支持向量机”更快并且易于在MapReduce框架下实现。
  • 奇异值分解:Lanczos算法是一个有效的求近似特征值的算法,可以应用在MapReduce上从而有效找到大数据的奇异值。该算法还可以应用于PCA。
  • K-均值聚类:canopy聚类是一个流行的分布式聚类方法,可以先调用canopy聚类法取得最初的k个簇,再运行K-均值聚类算法。

在Python中使用mrjob自动化MapReduce

  • mrjob之前是Yelp的内部框架,2010年底开源。可以用于在Amazon网络服务上启动MapReduce
    作业。可以通过pip安装,也可以clone GitHub上的源码来安装。在AWS上使用mrjob之前需要设置AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY两个环境变量。
  • 使用mrjob可以在EMR上运行Hadoop流,也可以在单机上测试。单机测试的命令为% python mrMean.py < inputFile.txt > myOut.txt,在EMR上运行同样任务的命令为% python mrMean.py -r emr < inputFile.txt > myOut.txt。所有上传和表单填写由mrjob自动完成。
  • 添加下面代码到mrMean.py,创建一个新的MRJob继承类,代码中的mapper和reducer都是该类的方法。steps方法定义了执行的步骤,在该方法中需要为mrjob制定mapper和reducer的名称,未指出则默认调用mapper和reducer。将原来代码中的mr方法修改为mrjob.step.MRStep
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from mrjob.job import MRJob
import mrjob

class MRmean(MRJob):
def __init__(self, *args, **kwargs):
super(MRmean, self).__init__(*args, **kwargs)
self.inCount = 0
self.inSum = 0
self.inSqSum = 0
def map(self, key, val):
if False:
yield
inVal = float(val)
self.inCount += 1
self.inSum += inVal
self.inSqSum += inVal*inVal
def map_final(self):
mn = self.inSum/self.inCount
mnSq = self.inSqSum/self.inCount
yield(1, [self.inCount, mn, mnSq])
def reduce(self, key, packedValues):
cumVal = 0.0; cumSumSq = 0.0; cumN = 0.0
for valArr in packedValues:
nj = float(valArr[0])
cumN += nj
cumVal += nj*float(valArr[1])
cumSumSq += nj*float(valArr[2])
mean = cumVal/cumN
var = (cumSumSq - 2*mean*cumVal + cumN*mean*mean)/cumN
yield(mean, var)
def steps(self):
return ([mrjob.step.MRStep(mapper=self.map, reducer=self.reduce,\
mapper_final=self.map_final)])

if __name__ == '__main__':
MRmean.run()

$ python mrMean.py --mapper < inputFile.txt
1 [100, 0.5095697, 0.34443931307936]

$ python mrMean.py < inputFile.txt
reading from STDIN
writing to %\mrMean.Forec.20160227.045814.965000\step-0-mapper_part-00000
Counters from step 1:
(no counters found)
writing to %\mrMean.Forec.20160227.045814.965000\step-0-mapper-sorted
> sort '%\mrMean.Forec.20160227.045814.965000\step-0-mapper_part-00000'
writing to %\mrMean.Forec.20160227.045814.965000\step-0-reducer_part-00000
Counters from step 1:
(no counters found)
Moving %\mrMean.Forec.20160227.045814.965000\step-0-reducer_part-00000\
-> %\mrMean.Forec.20160227.045814.965000\output\part-00000
Streaming final output from %\mrMean.Forec.20160227.045814.965000\output
0.5095697 0.08477803392126998
removing tmp directory %\mrMean.Forec.20160227.045814.965000

分布式SVM的Pegasos算法

Pegasos算法

  • SMO算法的一个替代品是Pegasos算法,后者可以很容易写成MapReduce形式。Pegasos是指原始估计梯度求解器(Primal Estimated sub-GrAdient Solver)。该算法使用某种形式的随机梯度下降方法来解决SVM所定义的优化问题,该算法所需的迭代次数取决于用户所期望的精确度而不是数据集的大小。其工作流程是:从训练集中随机挑选一些样本点添加到待处理列表中,之后按序判断每个样本点是否能被分类正确;如果是则忽略,否则将其加入待更新集合。批处理完毕后,权重向量按照这些错分的样本进行更新。伪代码为:
    • 将W初始化为0
    • 对每次批处理
    •     随机选择k个样本点(向量)
    •     对每个向量
    •         如果该向量被错分
    •             更新权重向量W
    •     累加对W的更新
  • 代码为Pegasos算法的串行版本,输入值T和k分别设定了迭代次数和待处理列表的大小。在T次迭代过程中,每次需要重新计算eta。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def predict(w, x):
return w*x.T
def batchPegasos(dataSet, labels, lam, T, k):
import random
m, n = shape(dataSet); w = zeros(n)
dataIndex = range(m)
for t in range(1, T+1):
wDelta = mat(zeros(n))
eta = 1.0/(lam*t)
random.shuffle(dataIndex)
for j in range(k):
i = dataIndex[j]
p = predict(w, dataSet[i,:])
if labels[i]*p < 1:
wDelta += labels[i]*dataSet[i,:].A
w = (1.0 - 1/t)*w + (eta/k)*wDelta
return w

mrjob实现MapReduce版本的SVM

  • Pegasos算法有大量的内积计算,内积计算可以并行。Cinfigure_options方法建立了一些变量,包括迭代次数T,待处理列表大小k。steps方法告诉mrjob应该做什么,按照什么顺序做。其创建了一个python列表,包含mapmap_finreduce几个步骤,最后将该列表乘以迭代次数,即在每次迭代中重复调用这个列表。mapper需要能够正确读取reducer输出的数据,对输入和输出格式作如下规定:Mapper输入为<mapperNum, valueList>,无输出;Mapper_final无输入,输出为<l, valueList>;Reducer的输入输出均为<mapperNum, valueList>。传入的值是列表数组,valueList第一个元素是一个字符串,表示列表后面存放的数据类型,每个Mapper_final都将输出同样的key以保证所有的key/value都输出给同一个reducer。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from mrjob.job import MRJob
import mrjob
import pickle
from numpy import *

class MRsvm(MRJob):
DEFAULT_INPUT_PROTOCOL = 'json_value'

def __init__(self, *args, **kwargs):
super(MRsvm, self).__init__(*args, **kwargs)
self.data = pickle.load(open('%\\svmDat27'))
self.w = 0
self.eta = 0.69
self.dataList = []
self.k = self.options.batchsize
self.numMappers = 1
self.t = 1

def map(self, mapperId, inVals): #needs exactly 2 arguments
#input: nodeId, ('w', w-vector) OR nodeId, ('x', int)
if False: yield
if inVals[0]=='w': #accumulate W-vector
self.w = inVals[1]
elif inVals[0]=='x':
self.dataList.append(inVals[1])#accumulate data points to calc
elif inVals[0]=='t': self.t = inVals[1]
else: self.eta=inVals #this is for debug, eta not used in map

def map_fin(self):
labels = self.data[:,-1]; X=self.data[:,0:-1]#reshape data into X and Y
if self.w == 0: self.w = [0.001]*shape(X)[1] #init w on first iteration
for index in self.dataList:
p = mat(self.w)*X[index,:].T #calc p=w*dataSet[key].T
if labels[index]*p < 1.0:
yield (1, ['u', index])#make sure everything has the same key
yield (1, ['w', self.w]) #so it ends up at the same reducer
yield (1, ['t', self.t])

def reduce(self, _, packedVals):
for valArr in packedVals: #get values from streamed inputs
if valArr[0]=='u': self.dataList.append(valArr[1])
elif valArr[0]=='w': self.w = valArr[1]
elif valArr[0]=='t': self.t = valArr[1]
labels = self.data[:,-1]; X=self.data[:,0:-1]
wMat = mat(self.w); wDelta = mat(zeros(len(self.w)))
for index in self.dataList:
wDelta += float(labels[index])*X[index,:] #wDelta += label*dataSet
eta = 1.0/(2.0*self.t) #calc new: eta
#calc new: w = (1.0 - 1/t)*w + (eta/k)*wDelta
wMat = (1.0 - 1.0/self.t)*wMat + (eta/self.k)*wDelta
for mapperNum in range(1,self.numMappers+1):
yield (mapperNum, ['w', wMat.tolist()[0] ]) #emit w
if self.t < self.options.iterations:
yield (mapperNum, ['t', self.t+1])#increment T
for j in range(self.k/self.numMappers):#emit random ints for mappers iid
yield (mapperNum, ['x', random.randint(shape(self.data)[0]) ])

def configure_options(self):
super(MRsvm, self).configure_options()
self.add_passthrough_option('--iterations', dest='iterations', default=2,\
type = 'int', help='T: number of iterations to run')
self.add_passthrough_option('--batchsize', dest='batchsize', default=100,\
type='int', help='k: number of data points in a batch')

def steps(self):
return ([mrjob.step.MRStep(mapper=self.map, mapper_final=self.map_fin, \
reducer=self.reduce)]*self.options.iterations)

if __name__ == '__main__':
MRsvm.run()
  • 大多数情况下并不需要使用MapReduce框架,如果作业花费了太多时间,首先应思考能否用更高效的语言编写,或者是否可以优化。寻找影响处理速度的瓶颈才能根本解决效率底下的问题。

MapReduce总结

当运算需求超出了当前资源的运算能力,可以考虑购买更好的机器,或者租用网络服务并使用MapReduce框架并行执行。很多机器学习算法都可以容易地写成MapReduce作业,而某些需要经过重写。大部分情况下,MapReduce并不需要。


参考文献: 《机器学习实战 - 美Peter Harrington》

原创作品,允许转载,转载时无需告知,但请务必以超链接形式标明文章原始出处(https://forec.github.io/2016/02/27/machinelearning15/) 、作者信息(Forec)和本声明。

分享到