劉 娟,豆育升,何 晨,唐 紅
(1.重慶郵電大學(xué) 計(jì)算機(jī)學(xué)院,重慶400065;2.重慶郵電大學(xué) 高性能計(jì)算與應(yīng)用研究所,重慶400065)
Hadoop是開源的云計(jì)算[1]架構(gòu),主要由 MapReduce[2]編程模型 和 HDFS(hadoop distributed file system)[3]文件系統(tǒng)組成。目前,對Hadoop性能優(yōu)化的研究主要有兩種方法,一是基于配置文件的性能優(yōu)化,從配置文件入手,改變配置參數(shù)以提高Hadoop集群的性能。主要的配置文件有 Conf下 面 的 Core-site.xml,Hdfs-site.xml和 Mapredsite.xml[4],這種優(yōu)化方法在一定程度上能優(yōu)化集群性能,但是也具有一定的局限性。一方面每個集群的硬件配置并不完全相同,每種優(yōu)化方法并不一定適合所有的集群。另一方面,這種方法只能靜態(tài)地對集群的配置參數(shù)作修改,在任務(wù)運(yùn)行中不能根據(jù)需要動態(tài)地改變配置文件并使其生效。第二種方法是優(yōu)化Hadoop調(diào)度器。因?yàn)檎{(diào)度器一旦在啟動,整個任務(wù)運(yùn)行過程將根據(jù)需要自適應(yīng)變化,并且適用于不同硬件平臺下的Hadoop集群,所以對Hadoop調(diào)度器的研究具有很大的現(xiàn)實(shí)意義。目前,Hadoop的調(diào)度器有3種,分別是Hadoop默認(rèn)的FIFO[5]調(diào)度器,計(jì)算能力調(diào)度器[5],公平份額調(diào)度器[5]。其中FIFO是所有調(diào)度器的基礎(chǔ),然而此調(diào)度器按照作業(yè)提交先后順序?qū)⒆鳂I(yè)排序,再根據(jù)這一順序逐一把任務(wù)分發(fā)給各個節(jié)點(diǎn)執(zhí)行,這就忽略了每個節(jié)點(diǎn)的實(shí)際負(fù)載情況。計(jì)算能力調(diào)度器能很好地支持內(nèi)存密集型作業(yè),公平調(diào)度器只是盡可能給每個任務(wù)分配等同的資源,都不能很好解決靈活性問題。本文即是在Hadoop默認(rèn)調(diào)度器的基礎(chǔ)上,提出一種基于CPU占用率作為負(fù)載指標(biāo)的動態(tài)調(diào)度算法。該算法能有效解決默認(rèn)FIFO調(diào)度器缺乏動態(tài)性和靈活性的問題,進(jìn)而縮短Hadoop集群的任務(wù)整體響應(yīng)時間。
Hadoop由核心組件MapReduce編程模型和HDFS分布式文件系統(tǒng)以及其他一些輔助組件組成。如圖1所示,其中,MapReduce編程模型負(fù)責(zé)Hadoop所有的數(shù)據(jù)流和控制流,貫穿整個作業(yè)執(zhí)行始末。JobTracker統(tǒng)一調(diào)度和分發(fā)任務(wù),TaskTracker負(fù)責(zé)每一個子任務(wù)的執(zhí)行,直到任務(wù)運(yùn)行完畢。MapReduce的設(shè)計(jì)思想是:一個任務(wù)可以拆成多個子任務(wù)同時執(zhí)行,然后將分解的多個任務(wù)按要求進(jìn)行處理,將中間結(jié)果歸并后統(tǒng)計(jì)出最后結(jié)果。MapReduce由Map和Reduce兩部分組成,其中Map處理一個key/value對生成的中間鍵值對集合,Reduce接受一個中間key和它對應(yīng)的值的集合并合并這些值以形成一個較小的值集合。MapReduce數(shù)學(xué)模型如下:
圖1 Hadoop數(shù)據(jù)流和控制流
map:(k1,v1)→(list(k2,v2)[6]
reduce:(k2,list(v2))→(list(k3,v3)[6]
HDFS通過塊級數(shù)據(jù)的分布冗余存儲,負(fù)責(zé)所有臨時的或者永久的數(shù)據(jù)存儲工作。它采用主從模型,包含一個NameNode和一系列的DataNode,
NameNode負(fù)責(zé)管理HDFS文件系統(tǒng),接受用戶的請求,DataNode則用來存儲數(shù)據(jù)文件。Hadoop整合 Map-Reduce和HDFS以及其他輔助層,將 Map-Reduce中的TaskTracker和HDFS中的DataNode部署在同一個計(jì)算節(jié)點(diǎn)上。
Hadoop默認(rèn)的是FIFO調(diào)度器,用戶在Hadoop客戶端提交任務(wù),調(diào)度器將所有用戶的作業(yè)提交到一個隊(duì)列中,JobTracker根據(jù)作業(yè)提交的先后順序?qū)⒆鳂I(yè)排序,再根據(jù)這一順序選擇將要調(diào)度的任務(wù)并將任務(wù)分發(fā)給TaskTracker。TaskTracker接收J(rèn)obTra-cker分配的任務(wù)并執(zhí)行[7-8]。FIFO調(diào)度器使得JobTracker的工作負(fù)擔(dān)較輕,每個Job都公平共享整個集群,但是同時也失去了靈活性和JobTracker動態(tài)調(diào)度的可能性。JobTracker不能把握每個Task-Tracker的實(shí)時負(fù)載能力,因?yàn)槊總€TaskTracker別無選擇,只能被動地接受JobTracker分發(fā)的任務(wù)[9]。這樣使得繁忙的節(jié)點(diǎn)更繁忙,空閑的節(jié)點(diǎn)更空閑,造成了系統(tǒng)資源的浪費(fèi)。
Hadoop由JobTracker/TaskTracker主從結(jié)構(gòu)[10]組成,且JobTracker在Hadoop集群中有且只有一個。用戶提交任務(wù)給JobTracker后,在JobTracker的構(gòu)造函數(shù)中,生成一個TaskScheduler成員變量,即默認(rèn)的FIFO調(diào)度器,進(jìn)行Job的調(diào)度,在JobTracker的OfferService函數(shù)中,調(diào)用TaskScheduler的Start函數(shù)啟動FIFO調(diào)度器,調(diào)度器根據(jù)初始化配置和集群情況調(diào)度和分配任務(wù)。TaskTracker準(zhǔn)備就緒后,向JobTracker報(bào)告自己當(dāng)前的狀態(tài)。而JobTracker返回給TaskTracker的HeartbeatResponse中已經(jīng)包含了分配好的任務(wù)LaunchTaskAction。TaskTracker接收該任務(wù),并根據(jù)任務(wù)類型(Map任務(wù)或者Reduce任務(wù))執(zhí)行任務(wù)。JobTracker/TaskTracker調(diào)度簡圖如圖2所示。
圖2 JobTracker/TaskTracker調(diào)度簡圖
FIFO調(diào)度器只能完成簡單的任務(wù)分發(fā)和執(zhí)行,每個Job公平共享整個集群,但是JobTracker無法根據(jù)當(dāng)前TaskTracker的負(fù)載情況實(shí)時判斷當(dāng)前節(jié)點(diǎn)是否還能繼續(xù)高效地執(zhí)行任務(wù)?;诖颂岢鲆环N改進(jìn)算法:在FIFO調(diào)度器的基礎(chǔ)上,實(shí)時獲取每個節(jié)點(diǎn)的CPU占用率,根據(jù)每個節(jié)點(diǎn)當(dāng)前的CPU占用率判斷節(jié)點(diǎn)的負(fù)載狀態(tài),將此占用率放入心跳包(HeartBeat)中,并反饋給JobTracker。當(dāng)Job-Tracker啟動調(diào)度器調(diào)度任務(wù)的時候,取出該值與CPU閾值比較,判斷當(dāng)前節(jié)點(diǎn)負(fù)載情況,從而決定是否應(yīng)該繼續(xù)給當(dāng)前節(jié)點(diǎn)分配任務(wù)。調(diào)度流程如圖3所示。
圖3 Hadoop改進(jìn)后的調(diào)度流程
該算法在每個TaskTracker中執(zhí)行一個線程來獲取CPU占用率。當(dāng)JobClient類的SubmitJob函數(shù)提交Job后,JobTracker接受該任務(wù),創(chuàng)建并初始化與Job有關(guān)的參數(shù)和一系列用來管理和調(diào)度任務(wù)的對象。Job分割成子任務(wù)后,由TaskTracker執(zhí)行任務(wù)。即TaskTracker的Run函數(shù)一直鏈接JobTracker,如果鏈接成功,TaskTracker的OfferService函數(shù)會定期與JobTracker通信一次,報(bào)告自己任務(wù)的執(zhí)行狀態(tài)并接受JobTracker指令。TaskTracker還會調(diào)用TransmitHeartBeat函數(shù)獲得HeartbeatResponse信息。然后調(diào)用HeartbeatResponse的getActions函數(shù)獲得JobTracker傳過來的指令即TaskTrackerAction數(shù)組,再根據(jù)數(shù)組類型決定應(yīng)該執(zhí)行的任務(wù)類型。TaskTracker和Job-Tracker的通信是由HeartBeat方法實(shí)現(xiàn)的。在原始版本的基礎(chǔ)上,添加一個獲取CPU占用率的線程,在OfferService函數(shù)中每隔一個心跳間隔啟動一次該線程更新一次CPU占用率,這樣就保證每次獲取到的CPU占用率是最新的。將此占用率傳遞給HeartBeat函數(shù),每次JobTracker和TaskTracker通信的時候,都會向JobTracker報(bào)告該CPU占用率,實(shí)時地與CPU閾值作比較,JobTracker再根據(jù)當(dāng)前節(jié)點(diǎn)的負(fù)載情況分配任務(wù)。這樣就做到了任務(wù)分配的實(shí)時性和動態(tài)性。以某節(jié)點(diǎn)為例,作浮點(diǎn)數(shù)運(yùn)算測試,CPU占用率達(dá)到峰值0.95(閾值)時,CPU計(jì)算性能逐漸下降??梢缘弥?,當(dāng)實(shí)時獲取的CPU占用率大于此閾值時,說明當(dāng)前CPU處于繁忙狀態(tài)。此時,JobTracker不能再向此節(jié)點(diǎn)分配任務(wù),應(yīng)該將任務(wù)分配給處于空閑狀態(tài)的節(jié)點(diǎn)執(zhí)行。直到當(dāng)前節(jié)點(diǎn)的CPU占用率小于閾值時,Job-Tracker再繼續(xù)向該節(jié)點(diǎn)分配任務(wù),循環(huán)直到任務(wù)全部執(zhí)行完成。這樣就可以實(shí)時動態(tài)地調(diào)整集群的負(fù)載狀況,進(jìn)而使作業(yè)整體響應(yīng)效率提高。
本算法采用Java語言,將新增代碼添加到Hadoop工程源代碼中,在eclipse平臺上重新編譯生成jar包。
輸入:用戶提交的任務(wù)
輸出:map/reduce計(jì)算出的結(jié)果
S1/*初始化集群參數(shù) */
double cpu_occupation=-1;/* 默認(rèn) CPU 占用率*/
S2/*獲取cpu占用率,intime_data是/proc/stat下的時間類*/
class SystemInfo implements Runnable
{public void get_occupy(intime_data o)
{File file=new File("/proc/stat");
BufferedReader br=new BufferedReader(new Input-StreamReader(new FileInputStream(file)));
String str=br.readLine();}
public double call_occupy(intime_data old,
intime_data new)
{return(1-(itime/(ntime-otime)))*100.0;}
}/*itime是空閑時間,(ntime-otime)為獲取時間的時間間隔,返回獲取的CPU占用率,此函數(shù)在SystemInfo線程中調(diào)用,SystemInfo將作為一個線程在每個Task-Tracker的offerservice()中啟動,一直執(zhí)行,實(shí)時獲取CPU占用信息*/
S3/*根據(jù)實(shí)時獲取的CPU占用率分配任務(wù)給Task-Tracker,通過Hadoop RPC機(jī)制將TaskTracker中的CPU占用率傳入JobTracker中,JobTracker再將此占用率傳入默認(rèn)FIFO調(diào)度器中,調(diào)度器的任務(wù)分配函數(shù)分析判斷后分配任務(wù)*/
HeartbeatResponse heartbeatResponse=
jobClient.heartbeat(status,justStarted,justInit-ed,askForNewTask,heartbeatResponseId,cpu _ occupation);/*將獲得的cpu_occupation傳入heartbeat心跳包中*/
{tasks=taskScheduler.assignTasks(taskTracker,
get(trackerName),cpu_occupation);
List<Task>assignTasks(TaskTracker
taskTracker,double cpu_occupan)
{/*初始化調(diào)度參數(shù)和環(huán)境*/
scheduleMaps:
{/*調(diào)度正式開始*/
if(cpu_occupan>=閾值)/*比較CPU占用率和閾值大?。?/p>
{break scheduleMaps;}
else/*否則給TaskTracker分配任務(wù)*/
{/*計(jì)算可獲得的時間槽*/
job.obtainNewLocalMapTask();
/*將任務(wù)傳遞給JVM執(zhí)行*/}}
}
S4/*任務(wù)執(zhí)行完畢,完成臨時數(shù)據(jù)和狀態(tài)的清理工作,Hadoop集群完成任務(wù)*/
操作系統(tǒng)Cenos5.6,帶寬100M,8個節(jié)點(diǎn),Intel雙核,硬盤250G,內(nèi)存2GB,jdk1.6.0-21,Hadoop源代碼版本 hadoop-0.21.0。
本文基于Hadoop-0.21.0版本實(shí)現(xiàn)改進(jìn)算法。將改進(jìn)的源代碼在eclipse上編譯成jar包,分別是hadoopcore.jar,hadoop-mapred.jar,hadoop-h(huán)dfs.jar,將 3 個jar包分別部署在Hadoop集群的每個節(jié)點(diǎn)上并重啟集群使其生效。實(shí)驗(yàn)采用Hadoop系統(tǒng)自帶的terasort(計(jì)算集中型)基準(zhǔn)測試程序。該程序?qū)崿F(xiàn)數(shù)據(jù)排序的功能,是典型的CPU密集型程序,適用于本改進(jìn)算法。實(shí)驗(yàn)分別對1百萬字節(jié),2百萬字節(jié),4百萬字節(jié)和5百萬字節(jié)的數(shù)據(jù)排序,在原始版本和改進(jìn)版本上分別測試。為了減小測試結(jié)果的隨機(jī)性,實(shí)驗(yàn)分別測試了10組數(shù)據(jù)取其平均值作為最終測試結(jié)果。實(shí)驗(yàn)結(jié)果如表1,2,3,4所示。表中記錄了四組數(shù)據(jù)原始版和改進(jìn)版的任務(wù)整體響應(yīng)時間??梢缘贸?,運(yùn)行于改進(jìn)版的四組數(shù)據(jù)的任務(wù)整體響應(yīng)效率都有不同程度的提高,分別提高了2.1秒,3.8秒,5.4秒,7.8秒。直觀對比如圖4所示。
計(jì)算得出,改進(jìn)版比原始版的作業(yè)整體響應(yīng)效率(任務(wù)提高的時間/原始版本任務(wù)執(zhí)行時間)至少提高了6%,如圖6所示。并且,隨著任務(wù)數(shù)據(jù)量的不斷增加,任務(wù)的整體響應(yīng)效率有快速提高的趨勢,這將更利于長作業(yè)的運(yùn)行。雖然實(shí)驗(yàn)測試數(shù)據(jù)呈倍數(shù)增長,但是任務(wù)執(zhí)行效率的提升并沒有按照倍數(shù)增長。這是因?yàn)?,一方面,任何CPU的運(yùn)算能力是有上限的,默認(rèn)版本的CPU計(jì)算能力已經(jīng)達(dá)到或者超過了CPU運(yùn)算性能的最佳運(yùn)算狀態(tài),本算法只改進(jìn)了CPU的過載運(yùn)算部分,使得在不影響CPU計(jì)算能力的情況下,最快地完成任務(wù)。另一方面,改進(jìn)算法啟動一個線程實(shí)時獲取CPU占用率,此線程在實(shí)時獲取CPU占用率的同時也耗費(fèi)了一部分CPU計(jì)算資源。
表1 1000000B數(shù)據(jù)測試結(jié)果
表2 2000000B數(shù)據(jù)測試結(jié)果
表3 4000000B數(shù)據(jù)測試結(jié)果
表4 5000000B數(shù)據(jù)測試結(jié)果
本文深入分析并改進(jìn)了Hadoop默認(rèn)任務(wù)調(diào)度模型,提出的以CPU占用率作為負(fù)載指標(biāo),在循環(huán)分配任務(wù)時根據(jù)反饋的負(fù)載指標(biāo)判斷節(jié)點(diǎn)負(fù)載情況的算法,確實(shí)提高了Hadoop的任務(wù)執(zhí)行性能,最終縮短了任務(wù)整體響應(yīng)時間解決了默認(rèn)調(diào)度器缺乏動態(tài)性和靈活性的問題。經(jīng)過Hadoop-0.21.0版本算法改進(jìn)前后的實(shí)驗(yàn)對比分析可知,本算法在百萬數(shù)量級上,至少提高了Hadoop集群6%的任務(wù)整體響應(yīng)效率。本算法的最大缺點(diǎn)是沒有考慮內(nèi)存耗費(fèi)情況,只是用計(jì)算集中型應(yīng)用程序。未來將尋求一種耗費(fèi)系統(tǒng)資源更少的方法來判斷節(jié)點(diǎn)的動態(tài)性能。此外可以對任務(wù)進(jìn)行排序,采用優(yōu)先級策略實(shí)時響應(yīng)更緊迫的任務(wù)。
[1]LIU Peng.Cloud computing[M].2nd ed.Beijing:Publishing House of Electronics Industry,2011(in Chinese).[劉鵬.云計(jì)算[M].2版.北京:電子工業(yè)出版社,2011.]
[2]Ger-h(huán)ard W.Multiagent system:A modem approach to distributed artificial intelligence[M].[S,L]:MITPRE-SS,2007.
[3]LUO Yongjun,SHAO Zhiqing.Progress and prospects of standardization for agent technology[J].Computer Applications and Software,2009,26(3):179-183(in Chinese).[羅勇軍,邵志清.Agent技術(shù)的標(biāo)準(zhǔn)化進(jìn)度與前景[J].計(jì)算機(jī)應(yīng)用與軟件,2009,26(3):179-183.]
[4]LUAN Yajian,HUANG Chongmin,GONG Gaosheng,et al.Research on performance optimization of hadoop platform[J].Computer Engineering,2010,36(14):262-266(in Chinese).[欒亞建,黃翀民,龔高晟,等.Hadoop平臺的性能優(yōu)化研究[J].計(jì)算機(jī)工程,2010,36(14):262-266.]
[5]ZHAN Kunlin.Hadoop performance optimization[EB/OL].[2011-04-25] .http://wenku.baidu.com/vie/3a86c11118964bcf84b9d 57bce.html(in Chinese).[詹坤林.Hadoop性能優(yōu)化[EB/OL].http:// wenku.baidu.com/vie/3a86c11118964bcf84b9d57bce.html,2011-04-25.]
[6]Tom White.Hadoop:The definitive guide[M].Beijing:Tsinghua University Press,2011(in Chinese).[Tom White.Hadoop:權(quán)威指南[M].北京:清華大學(xué)出版社,2011.]
[7]TIAN C,ZHOU H,HE Y,et al.A dynamic mapreduce scheduler for heterogeneous workloads[C]//Proceedings of the Eighth International Conference on Grid and Cooperative Computing.Washington,DC,USA:IEEE Computer Society,2009:218-224.
[8]Polo J,Carrera D,Becerra Y,et al.Performance driven task co-scheduling for mapreduce environment[C]//Network Operations and Management Symposium IEEE,2010:373-380.
[9]WANG Feng.The Hadoop algorithm of cluster Job scheduling[J].Programmer,2009(12):119-121(in Chinese).[王峰.Hadoop集群的作業(yè)調(diào)度算法[J].程序員,2009(12):119-121.]
[10]SUN Zhaoyu,YUAN Zhiping,HUANG Yuguang.The application of hadoop on the data-intensive computing[C]//The High Performance Computing Conference,2008:441-443(in Chinese).[孫兆玉,袁志平,黃字光.面向數(shù)據(jù)密集型計(jì)算Hadoop及其應(yīng)用研究[C]//2008年全國高性能計(jì)算學(xué)術(shù)年會論文集,2008:44l-443.]