转载请注明:http://duanple.blog.163.com/blog/static/70971767201092673696/ 作者 phylips@bmy

5.性能

在本节中我们将通过运行在大集群的机器上的两个计算来测量MapReduce的性能。一个计算在大概1TB的数据中搜索给定模式的文本。另一个计算对接近1T的数据进行排序。 

这两个程序就可以代表MapReduce用户所写的实际程序中的大部分子集:一类是将数据从一种表现形式转换为另一种表现形式的程序,另一类就是从一个大数据集合中抽取少量感兴趣的数据集。 

5.1   集群配置

所有的程序都是在一个由将近1800台机器组成的集群上执行。每台机器有2个打开了超线程的2G Intel Xeon处理器,4GB内存,2个160GB IDE硬盘,一个gigabit 以太网链路。这些机器安排在一个两级的树形交换网络上,根节点具有接近100-200 Gbps的总体带宽。所有机器具有相同的配置,因此在任意两个机器间的往返时间小于1ms。 

在4GB内存中,大概1-1.5G内存预留给在集群上运行的其他task。程序在一个周末的下午执行,此时cpu 硬盘 网络接近空闲。

 5.2   Grep

Grep程序通过扫描10^10个100字节的记录,查找一个很少出现的三字符模式(该模式出现在92337个记录里)。输入被划分为近似64MB大小的片段(M=15000),整个输出被放在一个文件中(R=1)。

【google论文三】MapReduce:简化大集群上的数据处理(下) - 星星 - 银河里的星星 

图2展示了整个计算的处理过程。Y轴表示输入数据的扫描速率。伴随这安排用于进行该MapReduce操作的机器数的增多,该速率也在逐渐攀升,当有1764个worker被分配该任务后达到了30GB/s的峰值。当map task结束后,该速率开始下降,大概在80秒的时候基本上降为0。整个计算过程花费了接近150秒,这包括一分钟的启动时间(这个开销主要是由将程序传输给所有worker,与GFS交互以打开1000个输入文件以及得到本地化优化所需要的信息造成的)。

 5.3   排序

排序程序对10^10个100字节的记录进行排序(接近1TB数据)。这个程序根据TeraSort Benchmark进行了建模。

 排序程序总共由不到50行用户代码组成。Map函数只有3行,将10字节长的排序用key值从一个文本行中抽取出来,然后输出该key,以及原始的文本行,作为中间结果key/value对。我们使用内建的Identity函数作为reduce操作。该函数将中间结果不过任何改变地输出。最后的排好序的结果写到一个具有2个副本的GFS文件集合上(即该程序将会产生2TB的输出)。

 与之前的类似,输入数据被划分为64MB的片段(M=15000)。排好序的输出被划分为4000个输出(R=4000)。划分函数使用key的字节表示来将它们划分为R个片段。

 对于该benchmark的划分函数建立在对于key值分布的了解上。对于一个通常的排序问题里,我们会增加一个预先进行的MapReduce操作,该操作会收集key值的采样值,然后使用这些key值的采样来计算最终排序序列的划分点。

【google论文三】MapReduce:简化大集群上的数据处理(下) - 星星 - 银河里的星星 

图3(a)展示了该排序程序的一个正常的处理过程。左上角的图表示输入速率。峰值速率大概是13GB/s,由于所有的map task在200秒前都结束了,所以该速率下降的很快。可以看到,输入速率要小于grep。这是因为排序的map task花费了大概一半的时间和IO带宽将中间结果写入到本地硬盘上。而与之相比,grep的中间结果输出几乎可以忽略不及。

 左边中间的图展示了数据从Map task向reduce task的网络传输速率。当第一个map task完成后,shuffling就开始了。图中的第一个峰值是由于第一批的1700个reduce task都启动后产生的(整个MapReduce操作被分配给大概大概1700个机器,每个机器同一时刻最多执行一个Reduce task)。大体上在300秒的时候,这一批的reduce task结束,然后启动了剩余的reduce task的shffling过程。大概在600秒时,这些shuffling才结束。

 左边最底下的图形展示了reduce task将排好序的数据写入最终输出文件的速率。在第一次的shuffling的结束与数据写入开始之间存在一个延时,是因为机器此时正忙着对中间数据进行排序。写入过程在一段时间内大概持续着大概2-4GB/s的速率。所有的写入大概在850秒的时候结束。假设启动的花费,整个计算花费了891秒。这接近于当前Terasort benchmark的最快结果1057秒。

 另外需要指出的是:输入速率比shuffle速率和输出速率高是因为我们的本地化优化(大部分的数据都是从本地硬盘读取的,这就绕过了网络带宽的限制)。Shuffle速率比输出速率高是因为输出阶段要写两份拷贝(保存两份是为了可靠性和可用性){这两份拷贝是需要耗费网络带宽的}。写两个副本是因为这是我们的底层文件系统提供的可靠性和可用性机制。如果底层文件系统使用了erasure code而不是副本,对于写数据的网络带宽需求将会减少。

 5.4   任务备份的影响

在图3(b),我们展示了一个没有开启任务备份的排序程序的执行过程。执行流类似与我们在图3(a)里看到的那样。除了在繁重的写活动出现后出现了一个长尾。在960秒时,只剩下5个reduce task还没有完成。然而这些掉队者,在300秒后才完成。整个计算花费了1283秒,增加了44%。

5.5   机器失败

图3(3),展示了我们在计算执行几分钟后,杀掉1746个worker里面的200个后的执行过程。底层的集群调度器,立刻重启在这些机器上的worker进程(因为只是进程被杀掉了,机器仍然是可用的)。

 死掉的worker作为负的输入速率进行显示,因为前面以及完成的map task的工作都消失了需要重新执行。Map task的重新执行相对较快。加上启动时间,整个计算过程在933秒的时候结束,仅仅比正常情况下的执行时间增加了5%。

 6.  经验

在2003年2月,我们写出了第一版的MapReduce库,2007年8月对它进行了很多包括本地化优化,跨机器的任务执行的动态负载平衡等等在内的改进。从那时起,我们欣喜的发现MapReduce库可以如此广泛地应用在我们工作中的各种问题上。目前它已经在google内部应用在广泛的领域上: 

大规模机器学习问题

用于Google新闻和购物的聚类问题

找到最流行的查询词

为了实验或者产品从网页中抽取属性(比如为了本地化搜索从大量网页中抽取地理位置)

大规模图形计算

【google论文三】MapReduce:简化大集群上的数据处理(下) - 星星 - 银河里的星星 

图4展示了过去的时间里,提交到我们的源代码管理系统中的MapReduce程序的数目。从2003年的0到2004年9月接近900个。MapReduce之所以如此成功,是因为它使得在半小时内写出一个简单地可以在数千台机器上跑的程序成为可能。这大大加速了我们的开发和原型周期。此外,它还使得没有分布式或者并行系统编程经验的程序员可以很容易地使用大量的计算资源。

【google论文三】MapReduce:简化大集群上的数据处理(下) - 星星 - 银河里的星星 

 在每个job结束时,MapReduce库还会记录该job使用的计算资源的统计信息。表1,我们展示了2004年8月,在google内部运行MapReduce job的一个子集的一些统计信息。

6.1   大规模索引

目前为止,我们一个最重要的MapReduce应用就是用它完全重写了产品索引系统,该系统为google的网页搜索服务产生所需要的数据结果。索引系统以一个由爬虫抓取的存储在GFS上的很大的文档集合作为输入,总共数据量要超过20TB。索引流程由5到10个MapReduce操作组成。通过使用MapReduce(而不是使用之前版本的索引系统所使用的自适应的分布式传输)有如下几个优点:

索引代码很简单,少而且容易理解。因为用于容错,分布和并行化的代码都隐藏在了MapReduce库中。比如,我们通过使用MapReduce将原来的一个计算过程的代码量从3800行降低到了700行。

 MapReduce库的性能以及足够好了,这样我们就能将不相关地计算分离,而不是为了降低额外的传输费用而将它们合在一块。这使得我们很容易改变索引处理过程。比如过去在旧系统中可能需要几个月才能完成的变更,现在在新的系统中几天就可以完成。

 索引处理流程变得很容易操作。因为大部分由于机器失败,慢机器以及网络引发的问题都由MapReduce库自动处理掉了,不需要进行额外的干预。另外也很容易通过给索引系统增加新机器来提高性能。

 

7.  相关工作

已经有很多系统提供了严格的编程模型,使用了很多限制来进行计算的并行化。MapReduce模型可以看做是基于我们的在现实中的海量计算经验,对这些模型的一个简化和提炼。更重要的是,我们提供了一个可以扩展到数千个处理器上的容错实现。与之相比,大部分的并行处理系统只是在小规模集群上实现的,将机器错误处理交给程序员。

 大同步模型和一些MPI实现为简化程序员编写并行程序提供了更高级别的抽象。这些系统与MapReduce的一个关键不同就是MapReduce使用了一个限制性的编程模型来为用户程序提供自动地并行化和透明的容错机制。

 我们的本地化优化策略主要源于这样的一些技术,比如active disks,在那里为了降低IO或者网络的数据传输,计算被放到那些靠近本地硬盘的处理元素中执行。我们是在由少量硬盘直接连接的PC上运行而不是在一个磁盘控制处理器上运行,但是策略是类似的。

 我们的任务备份机制类似于Charlotte系统中使用的eager调度机制。简单eager调度机制的一个缺点是如果给定的task引发了重复的失败,整个计算就无法完成。我们通过跳过坏记录的方式解决了这样的问题。

 MapReduce实现依赖于内部开发的一个集群管理系统,它负责在一个机器集合上分布调度用户任务。尽管不是本文关注的重点,该集群管理系统类似于Condor。

 作为MapReduce库的一部分的排序设施在操作过程上类似于Now-sort。源机器(map task)将数据划分进行排序,然后将每份传递给一个R个reduce worker中的一个。每个reduce worker在本地进行排序(如果可能的话就仅使用内存排序)。当然NOW-sort并不包含使得我们的库应用广泛的Map和Reduce函数。

 Rive提供了一个进程间通过分布式队列进行数据传输的编程模型。像MapReduce一样,River尽量提高系统的平均性能,即使是由于硬件异构或者系统扰动出现了非对称的情况。River通过仔细的硬盘和网络传输调度来达到平衡的完成时间。MapReduce使用了不同的策略。通过限制编程模型,MapReduce框架能将问题划分为大量的细粒度task。这些task可以在可用的worker上进行动态的调度,这样跑的快的worker就可以处理更多的task。该编程模型也允许我们在job快结束的时候调度task进行冗余的执行,这样大大减少了非对称出现时的完成时间。

 BAD-FS有一个与MapReduce完全不同的编程模型。与MapReduce不同,它的目标是降低在广域网上的job的执行时间。但是,它们具有两个基本的相同点:1.都采用了冗余执行从失败导致数据丢失中快速恢复 2.都采用了本地化优化以降低数据在网络上的传输。

 TACC是一个设计用户简化构建高可用网络服务的系统。与MapReduce类似,它依赖于重新执行作为实现容错的一个机制。

 8.  总结

MapReduce编程模型已经因各种目的在google内部成功使用。我们将这种成功归为几个原因。首先,模型很容易使用,即使对于没有分布式编程经验的程序员来说也是,因为它隐藏了并行化,容错,本地化优化,负载平衡的细节。第二,大量的问题可以简单地用MapReduce计算来表达。比如MapReduce被用来为google的网页搜索服务,排序,数据挖掘,机器学习很多其他的系统生成数据。第三,我们开发了一个可以扩展到数千台机器上MapReduce实现。该实现可以充分利用机器的资源,因此很适合用来处理在google碰到的很多大规模计算问题。

 通过这项工作我们也学到了很多。首先,通过限制编程模型可以使计算的并行化和分布很简单,同时也能让它容错。第二,网络带宽是一种稀缺资源。我们系统中大量的优化都是为了降低网络传输数据量:本地化优化允许我们从本地磁盘上读数据,将单份拷贝的中间数据写入本地磁盘节省了网络带宽。第三,冗余执行能用来降低慢机子的影响,以及用来处理机器失败和数据丢失。

 

致谢:

……

MapReduce从GFS上读取输入数据以及写出输出数据,因此我们要感谢…在开发GFS上的工作…我们还要感谢…在开发MapReduce使用的集群管理系统上的工作……