賈歐陽 阮樹驊 田興 楊峻興 李丹
摘 要: 由Apache軟件基金會開發(fā)的Hadoop分布式系統(tǒng)基礎(chǔ)架構(gòu),作為一個主流的云計算平臺,其核心框架之一的MapReduce性能已經(jīng)成為一個研究熱點,其中對于Shuffle階段的優(yōu)化,使用Combine優(yōu)化機(jī)制是關(guān)鍵。文章詳細(xì)介紹了MapReduce計算框架及Shuffle流程;分別從機(jī)理簡介、執(zhí)行時機(jī)、運行條件三方面詳細(xì)闡述了如何利用Combine優(yōu)化機(jī)制;通過搭建Hadoop集群,運用MapReduce分布式算法測試實驗數(shù)據(jù)。實驗結(jié)果充分證明,正確地運用Combine優(yōu)化機(jī)制能顯著提高M(jìn)apReduce框架的性能。
關(guān)鍵詞: 云計算; Hadoop; MapReduce; Shuffle; Combine
中圖分類號:TP393.2 文獻(xiàn)標(biāo)志碼:A 文章編號:1006-8228(2013)09-01-04
0 引言
我們正處在一個數(shù)據(jù)爆炸的時代,數(shù)據(jù)生成速度之快令人驚訝:紐約證券交易所每天產(chǎn)生1TB的數(shù)據(jù),F(xiàn)acebook存儲著約100億張照片占用了約1PB存儲容量[1]。由Apache軟件基金會開發(fā)的Hadoop分布式系統(tǒng)基礎(chǔ)架構(gòu),正是為了解決海量數(shù)據(jù)的存儲和計算問題,并且由于其高可靠性、高可擴(kuò)展性、高效性和高容錯性,已經(jīng)成為一種主流的云計算平臺[2]。Hadoop核心框架之一的MapReduce,在性能優(yōu)化和提高等方面的問題已經(jīng)被學(xué)術(shù)界和工業(yè)界所關(guān)注,而其中很重要的一部分是對Shuffle階段的優(yōu)化。本文詳細(xì)介紹了MapReduce框架和Shuffle階段流程,研究分析了Shuffle優(yōu)化過程中利用Combine優(yōu)化機(jī)制存在的問題,通過實驗和理論分析找出了解決方案,提出了Combine優(yōu)化機(jī)制的執(zhí)行時機(jī)和運行條件,并利用實例數(shù)據(jù)充分證明了正確地利用Combine優(yōu)化機(jī)制能顯著提高M(jìn)apReduce框架性能。
1 MapReduce框架
1.1 框架簡介
MapReduce是一種能夠在普通配置計算機(jī)上并行處理大量數(shù)據(jù)的并行計算框架,使用這個框架,我們只需要編寫我們想要執(zhí)行的簡單運算即可,寫出的程序自動在集群上并行執(zhí)行,而不必關(guān)心集群之間的調(diào)度、數(shù)據(jù)分布、負(fù)載均衡、容災(zāi)備份、通信等復(fù)雜細(xì)節(jié),這些問題都已經(jīng)被MapReduce框架封裝好了。
MapReduce可以讓沒有任何分布式、并行編程經(jīng)驗的程序員很容易地利用分布式系統(tǒng)的資源[3],用戶只需要自己定義map函數(shù)和reduce函數(shù),這兩個函數(shù)的靈感來自Lisp和許多其他函數(shù)式語言的map和reduce原語[4],map函數(shù)從輸入數(shù)據(jù)中獲取鍵/值對同時生成一個中間鍵值對集合,reduce函數(shù)合并有相同中間鍵的中間值,從而得到一個想要的結(jié)果。MapReduce框架會在任務(wù)運行時關(guān)注調(diào)度任務(wù),并監(jiān)視任務(wù)的執(zhí)行狀況,如果執(zhí)行失敗,將重新執(zhí)行該任務(wù)[5]。
1.2 實現(xiàn)機(jī)制
首先輸入文件會被用戶程序調(diào)用的MapReduce庫切分成若干64MB大小的數(shù)據(jù)塊,這些數(shù)據(jù)塊在框架中會被自動地創(chuàng)建副本,然后調(diào)用fork原語創(chuàng)建數(shù)個子程序,在這些子程序中有一個特殊程序master,其他程序都被當(dāng)作worker程序,一個空閑的worker程序?qū)⑹盏接蒻aster分配的一個map任務(wù)和reduce任務(wù)。
worker程序從輸入的數(shù)據(jù)片段中解析出鍵值對,然后這些鍵值對被傳遞給用戶自定義實現(xiàn)的map函數(shù),由map函數(shù)處理后,生成的中間鍵值對最終被寫到本地磁盤上,master獲取到這些鍵值對在本地磁盤上的存儲位置并傳送給負(fù)責(zé)執(zhí)行reduce任務(wù)的worker節(jié)點。
reduce worker程序使用RPC從負(fù)責(zé)執(zhí)行map任務(wù)的worker節(jié)點所在主機(jī)的磁盤上讀取這些緩存數(shù)據(jù),由于存在不同的key值映射到相同的reduce任務(wù)上,因此會進(jìn)行排序,如果中間數(shù)據(jù)太大無法在內(nèi)存中完成排序,那么就要進(jìn)行外排序[5],最終使得具有相同key值的數(shù)據(jù)聚合在一起。
排序后的中間數(shù)據(jù)經(jīng)過reduce worker遍歷后,reduce worker程序?qū)⒚恳粋€惟一的中間key值和它相關(guān)的中間value值的集合傳遞給用戶自定義的reduce函數(shù)[5]。
當(dāng)所有的map、reduce任務(wù)完成后,用戶程序?qū)ap、reduce的調(diào)用才返回。
2 Shuffle過程
Shuffle過程可以理解為是從map映射輸出到reduce消化輸入的整個過程,整個過程被稱為MapReduce的“心臟”,關(guān)乎整個框架性能[1],應(yīng)用開發(fā)人員對MapReduce框架的改進(jìn)也主要集中在Shuffle階段。
MapReduce框架中Shuffle流程如圖1所示[1]。
2.1 Map端
⑴ 作業(yè)提交以后,Map端從輸入塊中讀取record,依次調(diào)用map函數(shù)進(jìn)行處理,并映射輸出(key,value)鍵值對。
⑵ 生成的鍵值對串行化的輸出到環(huán)形內(nèi)存緩沖區(qū)[7],這期間如果應(yīng)用程序員沒有定制partitioner,那么系統(tǒng)會調(diào)用默認(rèn)的HashPartitioner把鍵值對劃分到對應(yīng)的partition分區(qū)。
⑶ 當(dāng)緩沖區(qū)的內(nèi)容達(dá)到設(shè)定的閾值時,一個后臺spill線程便開始把這些內(nèi)容溢寫到磁盤[1],spill線程在把緩沖區(qū)數(shù)據(jù)寫到磁盤前,會對它進(jìn)行一個二次快速排序,首先根據(jù)所屬的partition排序,然后在每個partition內(nèi)再按key排序[8]。
⑷ 當(dāng)緩沖區(qū)的數(shù)據(jù)輸出到磁盤后,可能會出現(xiàn)多個spill文件,這時候就進(jìn)入Map端的合并(merge)階段,會對這些文件做一個歸并排序,最后輸出一個index文件和一個數(shù)據(jù)文件,index文件記錄了不同的key在數(shù)據(jù)文件中偏移量[7]。
2.2 Reduce端
⑴ Copy Phase:在Map端,只要有一個任務(wù)完成,Reduce任務(wù)就開始通過http方式復(fù)制其輸出,而不是等所有Map端任務(wù)都結(jié)束之后再進(jìn)行[1]。
⑵ Sort Phase:Map端輸出在被取回的同時,被合并成多個文件,并按鍵排序,這個階段更恰當(dāng)?shù)卣f,應(yīng)該只進(jìn)行了合并,因為排序(Sort)是在Map端進(jìn)行的[1]。
⑶ Merge Phase:歸并過程中的數(shù)據(jù)可能既有在內(nèi)存中的也有在磁盤上的,如果都在內(nèi)存中,則會直接被復(fù)制到Reduce端而省去向磁盤寫這一步[8],然后通過歸并得到最終的文件。
3 Combine優(yōu)化
3.1 Combine優(yōu)化機(jī)制簡介
MapReduce框架的運作基于鍵值對,即數(shù)據(jù)的輸入是鍵值對,生成的結(jié)果也是存放在集合里的鍵值對,其中鍵值對的值也是一個集合,一個MapReduce任務(wù)的執(zhí)行過程以及數(shù)據(jù)輸入輸出的類型如下所示,這里我們定義list表示集合:
map(K1, V1) -> list(K2, V2)
combine(K2, list(V2)) -> list(K2, V2)
reduce(K2, list(V2)) -> list(K3, V3)
map函數(shù)操作所產(chǎn)生的鍵值對會作為combine函數(shù)的輸入,經(jīng)combine函數(shù)處理后再送到reduce函數(shù)進(jìn)行處理,減少了寫入磁盤的數(shù)據(jù)量,同時也減少了網(wǎng)絡(luò)中鍵值對的傳輸量。在Map端,用戶自定義實現(xiàn)的Combine優(yōu)化機(jī)制類Combiner在執(zhí)行Map端任務(wù)的節(jié)點本身運行,相當(dāng)于對map函數(shù)的輸出做了一次reduce[8]。
集群上的可用帶寬往往是有限的,產(chǎn)生的中間臨時數(shù)據(jù)量很大時就會出現(xiàn)性能瓶頸,因此盡量避免Map端任務(wù)和Reduce端任務(wù)之間大量的數(shù)據(jù)傳輸是很重要的。使用Combine機(jī)制的意義就在于使Map端輸出更緊湊,使得寫到本地磁盤和傳給Reduce端的數(shù)據(jù)更少[1]。
選用Combine機(jī)制下的Combiner雖然減少了IO,但是等于多做了一次reduce,所以應(yīng)該查看作業(yè)日志來判斷combine函數(shù)的輸出記錄數(shù)是否明顯少于輸入記錄的數(shù)量,以確定這種減少和花費額外的時間來運行Combiner相比是否值得[8]。
在具體的分布式應(yīng)用中,應(yīng)用開發(fā)人員需要自己定制map函數(shù)和reduce函數(shù),在本文實驗中用到的具體算法如下:
Algorithm 1是分割映射鍵值對的map算法。map函數(shù)的輸入分別為文本數(shù)據(jù)中行偏移量key、每行文本內(nèi)容text和上下文對象context,算法第1行將輸入的每行文檔內(nèi)容數(shù)據(jù)text分割成單個的單詞k,放入集合K中,第2-3行對K中的每個k,映射輸出鍵值對(k,1),這里每個鍵k對應(yīng)的值是1。
Algorithm 2是對中間鍵值對合并的combine算法。combine函數(shù)的輸入為經(jīng)map函數(shù)分割后的每個單詞k、單詞k及其統(tǒng)計次數(shù)存放的集合K1和上下文對象context,其中鍵值對的輸入鍵midsum為經(jīng)過combine算法后分發(fā)出的一類鍵名,用以標(biāo)識鍵值對已經(jīng)經(jīng)過combine算法處理。算法第1行首先判斷輸入鍵是否為midsum,如果不是,則第2-4行對K1集合中的統(tǒng)計次數(shù)進(jìn)行累加,輸出鍵值對midsum作為鍵,累加結(jié)果sum作為值;第5-7行因為輸入鍵是midsum,則直接映射輸出,避免輪詢造成更大的開銷。
Algorithm 3是規(guī)約reduce算法。reduce函數(shù)的輸入可能是combine函數(shù)的輸出(midsum,sum)(存放于集合K2中),也可能是沒有經(jīng)過combine函數(shù)合并的map函數(shù)的輸出(k,1) (仍然存放于集合K1中),還有上下文對象context,算法第1-2行對K1/K2集合中每個k進(jìn)行累加求和,第3行輸出最終的鍵值對(endsum,sum),sum即為最終的求和結(jié)果。
算法1至3如圖2所示。
[Algorithm 1:分割鍵值對map算法
輸入:key, text, context
輸出:中間鍵值對(k,v)
步驟:
1 list(K) ← split text
2 foreach k ∈ K
3 emit intermediate(k,1);][Algorithm 3:規(guī)約reduce算法
輸入:key, K1/K2, context
輸出:求和結(jié)果
步驟:
1 foreach k ∈ K1/K2
2 compute the sum of v
3 emit (endsum,sum);\&][Algorithm 2:合并中間鍵值對combine算法
輸入:key, K1, context
輸出:中間鍵值對(k,v)
步驟:
1 if k does not start with midsum
2 foreach k ∈ K1
3 compute the sum of v
4 emit intermediate(midsum,sum);
5 else
6 foreach k ∈ K1
7 emit intermediate(k,v);
3.2 Combine優(yōu)化機(jī)制執(zhí)行時機(jī)
⑴ Map端spill的時候
在Map端內(nèi)存緩沖區(qū)進(jìn)行溢寫的時候,數(shù)據(jù)會被劃分成相應(yīng)分區(qū),后臺線程在每個partition內(nèi)按鍵進(jìn)行內(nèi)排序。這時如果指定了Combiner,并且溢寫次數(shù)最少為3(min.num.spills.for.combine屬性的取值)時,Combiner就會在排序后輸出文件寫到磁盤之前運行[1]。
⑵ Map端merge的時候
在Map端寫磁盤完畢前,這些中間的輸出文件會合并成一個已分區(qū)且已排序的輸出文件,按partition循環(huán)處理所有文件,合并會分多次,這個過程也會伴隨著Combiner的運行[7]。
⑶ Reduce端merge的時候
從Map端復(fù)制過來數(shù)據(jù)后,Reduce端在進(jìn)行merge合并數(shù)據(jù)時也會調(diào)用Combiner來壓縮數(shù)據(jù)。Combiner通常被看作是一個Map端的本地reduce函數(shù)的實現(xiàn)類Reducer,這個實驗[9]也驗證了這一理論的不足,但是在很多情況下Combiner在Reduce端的作用是有限的。
3.3 Combine優(yōu)化機(jī)制運行條件
⑴ 滿足交換和結(jié)合律[10]
結(jié)合律:
(1)+(2+3)+(4+5+6)==(1+2)+(3+4)+(5)+(6)== ...
交換律:
1+2+3+4+5+6==2+4+6+1+2+3== ...
應(yīng)用程序在滿足如上的交換律和結(jié)合律的情況下,combine函數(shù)的執(zhí)行才是正確的,因為求平均值問題是不滿足結(jié)合律和交換律的,所以這類問題不能運用Combine優(yōu)化機(jī)制來求解。
例如:mean(10,20,30,40,50)=30
但mean(mean(10,20),mean(30,40,50))=22.5
這時在求平均氣溫等類似問題的應(yīng)用程序中使用Combine優(yōu)化機(jī)制就會出錯。
另外需注意,在撰寫含有Combiner的應(yīng)用時,對于所有map函數(shù)的輸出,并非一定都經(jīng)過Combiner,有些會直接進(jìn)入Reducer。
如果我們在程序中定制了一個Combiner,MapReduce框架使用它的次數(shù)可能是0次也可能是多次,為了保證Combine機(jī)制的正確運用,Combiner在數(shù)據(jù)的轉(zhuǎn)換上必須與Reducer等價,如果我們?nèi)サ鬋ombiner,Reducer的輸出也應(yīng)保持不變,而且,當(dāng)Combiner被應(yīng)用于中間數(shù)據(jù)的任意子集時,仍需保持等價的轉(zhuǎn)換特性[9]。
⑵ Combine優(yōu)化機(jī)制中存在的輪詢問題
在開發(fā)過程中,使用Combine優(yōu)化機(jī)制會存在輪詢問題,即一個combine函數(shù)的輸出結(jié)果可能會成為其自身的輸入[11],經(jīng)過combine函數(shù)處理的數(shù)據(jù)會再次進(jìn)入combine函數(shù),但輪詢問題是不可避免的,所以要保證combine函數(shù)的輸入類型和輸出類型必須一致,若不一致,要增加判斷的邏輯。
4 實驗結(jié)果分析
4.1 實驗環(huán)境
本文通過部署一個Hadoop偽分布環(huán)境,通過實驗對比和分析來驗證文中關(guān)于利用Combine優(yōu)化機(jī)制理論的正確性。在實驗搭建的集群中,NameNode和DataNode都在本機(jī)中。
硬件環(huán)境:Intel? Core? i5 CPU M 480 @ 2.67GHz×4,3.7GB內(nèi)存,500GB硬盤。
軟件環(huán)境:Fedora16,Hadoop 0.21.0,Jdk-6u27-linux-i586。
4.2 實驗結(jié)果及分析
本文采用數(shù)字求和程序作為實驗測試程序,采用文本文檔作為實驗數(shù)據(jù)集,文本中數(shù)據(jù)隨機(jī)生成,正負(fù)相間,以防數(shù)據(jù)全正或全負(fù)致使計算結(jié)果溢出。
⑴ 實驗一
采用418.9MB文本文檔數(shù)據(jù)集進(jìn)行多次實驗,分析實驗日志,取其中三組實驗結(jié)果列表,利用Combine機(jī)制優(yōu)化前后,框架中各部分的輸入輸出和耗時對比如表1所示。
作業(yè)結(jié)束后,查看日志,利用Combine機(jī)制優(yōu)化前后作業(yè)都分別共執(zhí)行了12個map任務(wù)和1個reduce任務(wù),表1中詳細(xì)記錄了作業(yè)執(zhí)行完成后map函數(shù)、combine函數(shù)和reduce函數(shù)的輸入輸出數(shù)量和耗時,通過表1的數(shù)據(jù)對比可以看出:一方面,使用Combine優(yōu)化機(jī)制優(yōu)化后,作業(yè)執(zhí)行時間明顯減少;另一方面,從實驗數(shù)據(jù)中可以看到combine函數(shù)輸入的數(shù)量要明顯大于map函數(shù)輸出的數(shù)量,而且combine函數(shù)輸出的數(shù)量要明顯大于reduce函數(shù)輸入的數(shù)量,所以可以驗證在使用Combine優(yōu)化過程中存在明顯的輪詢問題,但由于實驗中輸入輸出數(shù)據(jù)類型相同,所以輪詢并不影響實驗的最終輸出結(jié)果,Combiner的輸出結(jié)果被寫到中間文件,并被發(fā)送到reduce任務(wù)中,經(jīng)reduce函數(shù)處理后直接被寫到最終的輸出文件中,保存在HDFS文件系統(tǒng)上。
⑵ 實驗二
分別采用41.9MB、83.8MB、167.6MB、335.1MB、418.9MB文本文檔作為測試數(shù)據(jù)集。
圖3為利用Combine優(yōu)化機(jī)制優(yōu)化前后作業(yè)執(zhí)行總耗時對比,在圖3中可以看到,隨著數(shù)據(jù)集增大,利用Combine機(jī)制優(yōu)化過的作業(yè)在總耗時方面時間減少明顯,性能平均提高43%,可以充分驗證使用Combine優(yōu)化機(jī)制會大量減少Map端最后寫到磁盤的數(shù)據(jù)量,同時也減少了網(wǎng)絡(luò)中傳輸?shù)臄?shù)據(jù)量,大幅提高系統(tǒng)性能。
圖4為利用Combine優(yōu)化機(jī)制優(yōu)化前后Map端計算耗時對比,可以看到,利用Combine優(yōu)化機(jī)制優(yōu)化后,計算機(jī)壓力雖然大部分在Map端,但是Map端耗時隨著數(shù)據(jù)集增大而減少也很明顯。
圖5為利用Combine優(yōu)化機(jī)制優(yōu)化前后Reduce端耗時對比,因為數(shù)據(jù)求和的實驗在利用Combine優(yōu)化機(jī)制優(yōu)化后,計算壓力大部分都在Map端,所以可以看到當(dāng)數(shù)據(jù)集增大時,Reduce端執(zhí)行時間基本都維持在4秒左右。
5 結(jié)束語
本文通過對Shuffle流程的詳細(xì)分析,指出Combine機(jī)制在Shuffle流程中的具體執(zhí)行位置,并詳細(xì)分析了Combine優(yōu)化機(jī)制執(zhí)行所需條件和執(zhí)行過程中存在的問題。通過實驗分析,可以看到Combine機(jī)制對性能的提升確實很大,但在實際開發(fā)應(yīng)用中應(yīng)該結(jié)合文中所述和實際開發(fā)需要,評測是否需要利用Combine優(yōu)化機(jī)制,只有在網(wǎng)絡(luò)帶寬資源有限,對系統(tǒng)的瓶頸比較大時才應(yīng)該考慮使用Combine機(jī)制,以減少數(shù)據(jù)在網(wǎng)絡(luò)中的傳輸量,降低系統(tǒng)對網(wǎng)絡(luò)帶寬的需求。
參考文獻(xiàn):
[1] Tom White.Hadoop權(quán)威指南[M].清華大學(xué)出版社,2011.
[2] 周一可.云計算下MapReduce 編程模型可用性研究與優(yōu)化[D].上海交通大學(xué),2011.
[3] Jeffrey Dean, Sanjay Ghemawat. MapReduce: Simplified Data Processing on Large Clusters[C],2004.
[4] Wikipedia. MapReduce介紹.2012年09月22日引自http://zh.wikipedia.org/wiki/MapReduce
[5] 王凱.MapReduce集群多用戶作業(yè)調(diào)度方法的研究與實現(xiàn)[D].國防科學(xué)技術(shù)大學(xué),2010.11.
[6] 徐強(qiáng),王振江.云計算應(yīng)用開發(fā)實踐[M].機(jī)械工業(yè)出版社,2012.
[7] gaobotian. Hadoop源代碼分析(完整版). 2012年09月22日引自http://wenku.baidu.com/view/ffc10130eefdc8d376ee32ec.html
[8] 皮冰鋒等.Hadoop開發(fā)者第一期.2012年9月22日引自http://www.hadoopor.com/
[9] Lam, C著,韓冀中譯.Hadoop實戰(zhàn)[M].人民郵電出版社,2011.
[10] StackOverFlow. "Combiner" Class in a mapreduce job. 2012年9月23日引自http://stackoverflow.com/questions/10220371/Combiner-class-in-a-mapreduce-job
[11] 何忠育等.Hadoop開發(fā)者第四期.2012年9月23日引自 http://www.hadoopor.com/