楊 浩,滕 飛,李天瑞,李 曌
(西南交通大學信息科學與技術學院,四川 成都610031)
快速發(fā)展的計算機技術、網絡技術和人機交互技術等,讓諸如天文、軍事、社交等領域的數據生成和采集更為便利和快捷,從而使得數據規(guī)模不斷增長,形成了大數據。處理大數據并從中提取有用的信息,需要良好的并行計算架構。MapReduce是云計算中很流行的并行計算框架之一[1],其優(yōu)勢在于隱藏了數據并行處理的細節(jié),讓用戶更專注于數據的處理邏輯[2]。MapReduce已被廣泛用于自然語言處理、機器學習、大規(guī)模圖像處理等領域[3]。
Hadoop是Apache軟件基金提供的基于MapReduce的開源云計算平臺。Hadoop集群由主節(jié)點和多個從節(jié)點構成[4]。主節(jié)點用于接收用戶提交的作業(yè),并將作業(yè)的Map任務和Reduce任務平均分配到各從節(jié)點上運行,以達到并行計算的目的。Hadoop將從節(jié)點上的計算資源抽象成插槽,每一個插槽在同一時刻可處理一個Map任務或者Reduce任務。Hadoop上調度器的功能是將空閑插槽分配給不同任務,因此調度算法的好壞直接關系到Hadoop集群的整體性能和資源的利用情況[5]。
實時作業(yè)分為軟實時作業(yè)和硬實時作業(yè)。軟實時作業(yè)是指作業(yè)的響應時間可以超過其截止時間;而硬實時作業(yè)是指作業(yè)的響應時間不允許超過其截止時間。硬實時作業(yè)是剛性的,超過截止時間的作業(yè)將不再有任何意義。Hadoop中現有的調度算法大多是針對集群的利用率、作業(yè)響應速度等方面設計的,而針對硬實時作業(yè)對截止時間要求的調度算法比較少。
本文研究Hadoop上的作業(yè)調度模型,設計并實現了一種基于作業(yè)空閑時間的硬實時調度器LSS。為了提高集群在處理硬實時作業(yè)時的成功率,減輕集群的負載,LSS設計過程中采用了以下四個機制:
(1)LSS利用空閑時間來描述作業(yè)的優(yōu)先級??臻e時間是指作業(yè)距離其截止時間前還可以延遲的最大時間??臻e時間越小的作業(yè),其緊急程度越大,作業(yè)的優(yōu)先級也會越高。
(2)LSS動態(tài)預測作業(yè)的空閑時間,并隨著時間的推移,LSS實時更新預測結果。LSS是根據作業(yè)運行過程中產生的數據對空閑時間進行預測的,實時更新有效地保證了預測結果的正確性。
(3)LSS實時更新作業(yè)在作業(yè)隊列中的優(yōu)先級順序,將空閑時間比較小的作業(yè)移到隊列前面,當有資源空閑時,空閑時間小的作業(yè)就會優(yōu)先執(zhí)行,這樣有效地保證了集群中時限作業(yè)的成功執(zhí)行。
(4)當預測出當前資源無法保證作業(yè)在截止期前完成時,LSS會提前將這些無法成功執(zhí)行的作業(yè)結束。這樣能有效地減輕集群的負載,同時增加其他時限作業(yè)成功執(zhí)行的可能性。
Hadoop默認調度器的調度算法是FIFO,適合批處理作業(yè),但是該調度器在多用戶并存、資源利用率等方面存在不足,因此Hadoop中出現了其他調度器,并分別從不同角度對作業(yè)調度進行了優(yōu)化。公平調度器[6]和容量調度器用以解決多用戶多隊列的調度問題。LATE調度器[7]針對集群的異構進行了優(yōu)化。MFS[5]使用需求向量來描述作業(yè)對各類資源的需求大小,滿足了不同作業(yè)對資源的不同需求。許丞等[8]將JobTracker上的資源管理和任務監(jiān)控分布到不同節(jié)點上,降低了主節(jié)點的負載。這些調度器對提高Hadoop集群的資源管理和任務調度都取得了一定成果,但是未能對作業(yè)的實時性進行優(yōu)化。
隨著用戶對云計算中作業(yè)時效性的要求越來越高,Hadoop平臺中也出現了一些實時調度器。Delay Scheduler[6]、DSFS[9]等根據數據本地性對作業(yè)的執(zhí)行進行了優(yōu)化。魏曉輝[10]設計了一種基于資源預測的Delay調度算法,解決了Delay調度算法中不合理等待問題。寧文瑜等人[11]設計的自適應延遲調度算法,動態(tài)調整作業(yè)的等待時間,優(yōu)化了公平性和數據本地性之間的平衡。Deadline Constraint Scheduler[4]在作業(yè)提交時判斷當前集群能否滿足作業(yè)的時限要求,如果無法滿足,則結束該作業(yè),這樣有效地保證了其他作業(yè)的成功執(zhí)行。董西成等人[12]設計了一種能同時處理實時作業(yè)和非實時作業(yè)的調度器。這些調度器雖然縮短了作業(yè)的整體響應時間,但是無法保證硬實時作業(yè)的時限要求。為提高集群在處理硬實時作業(yè)時的整體性能,本文設計并實現了LSS調度器。該調度器不僅可以保證硬實時作業(yè)的時限要求,而且還可以有效地減輕集群的負載。
LSS的關鍵在于空閑時間的估算,本小節(jié)將介紹其空閑時間的估算方法。
(1)空閑時間定義。
空閑時間是指作業(yè)距離其截止時間前還可以延遲的最大時間。式(1)為作業(yè)空閑時間的計算公式。
其中,TSpare表示作業(yè)的空閑時間;TDeadLine表示該作業(yè)的截止時間;TNow表示當前時間;TRemaining表示該作業(yè)剩余執(zhí)行時間,即如果系統(tǒng)中只有該作業(yè)執(zhí)行,作業(yè)還需要執(zhí)行的時間。
(2)剩余時間估計。
MapReduce分為Map階段和Reduce階段。在Map階段,如果任務分片設置合理,在誤差允許的范圍內,每個Map任務的執(zhí)行時間是一樣的。而對于Reduce階段,由于其執(zhí)行時間難以估計,可以粗略將Reduce任務的執(zhí)行時間等效于Map任務的執(zhí)行時間,即所有任務的執(zhí)行時間是相同的。本文用已經執(zhí)行完的任務的平均執(zhí)行時間來預測未完成的任務的執(zhí)行時間。式(2)為任務的平均執(zhí)行時間預測方法。
其中,TAvgTask表示任務的平均執(zhí)行時間,∑TFinishedTask表示所有執(zhí)行完的任務的執(zhí)行時間之和,NFinishedTask表示已經執(zhí)行完的任務的數目。
沒有執(zhí)行完的任務分為Running任務(正在執(zhí)行,但沒有執(zhí)行完的任務)和Pending任務(已經準備好,但沒有開始執(zhí)行的任務),而其中Running任務已經執(zhí)行了一段時間,剩余時間中應該不包括這部分。同時,考慮作業(yè)中未執(zhí)行完的子任務數目、子任務的并行度,以及Map任務和Reduce任務的并行度不一樣等因素,用式(3)計算TRemaining。
其中,NunFinishedMapTask表示未執(zhí)行完的Map任務的數目,NMapSlot表示集群中 Map插槽的槽數,NunFinishedReduceTask表示未執(zhí)行完的Reduce任務的數目,NReduceSlot表示集群中Reduce插槽的槽數,符號
表示向上取整,∑TRunningMapTask表示正在運行的 Map任務已經執(zhí)行的時間之和,∑TRunningReduceTask表示正在運行的Reduce任務已經執(zhí)行的時間之和。
(3)LSS中空閑時間估算。
將(3)中得到的TRemaining代入式(1),即可得到TSpare,如式(4)所示。
任務提交時無法計算TAvgTask。本文采用如下方式處理:在作業(yè)提交時,TSpare用式(5)來計算;而當作業(yè)有任務執(zhí)行完時,用式(4)來計算TSpare。
(1)預測并提前結束不能完成的作業(yè)。
對于傳統(tǒng)的硬實時作業(yè)調度器,當作業(yè)的截止時間大于當前時間時,就結束該作業(yè)。此調度過程沒有考慮作業(yè)不能成功執(zhí)行的其它情況。當TSpare<0時,即使讓作業(yè)繼續(xù)執(zhí)行,該作業(yè)也不會在截止時間之前完成,所以LSS還增加了另外一個結束作業(yè)的條件:當TSpare<0時,結束該作業(yè)。由于估算出來的TSpare有時會偏小,為了避免將能正常執(zhí)行的作業(yè)結束,所以在其上加了一個閾值TAvgTask,即當TSpare+TAvgTask<0時,結束該作業(yè)。
(2)LSS調度流程。
Hadoop集群上的節(jié)點分為主節(jié)點和從節(jié)點,是一種典型的主從式(Master/Slave)架構[13]。和調度器相關的守護進程為主節(jié)點上的JobTracker和從節(jié)點上的TaskTracker。JobTracker守護進程負責為作業(yè)隊列分配資源,而TaskTracker負責Map任務或者Reduce任務在從節(jié)點中的插槽上運行。當從節(jié)點上有插槽空閑時,則TaskTracker通過心跳機制將該信息發(fā)送給JobTracker,Job-Tracker通過scheduleTasks()函數為TaskTracker對應的節(jié)點分配資源。算法1描述了LSS中的該過程,本算法包括以下幾個步驟:(1)重新計算作業(yè)隊列中各作業(yè)的平均時間和空閑時間;(2)將超過截止時間的作業(yè)或者空閑時間小于給定閾值的作業(yè)結束;(3)根據空閑時間對作業(yè)隊列排序;(4)獲取TaskTracker對應從節(jié)點的空閑插槽;(5)為剛提交的作業(yè)分配一個Map插槽;(6)按照作業(yè)隊列中的順序為作業(yè)分配插槽。
算法1 scheduleTasks()
輸入:jobQueue為作業(yè)隊列,t為TaskTracker。
輸出:assignedTasks為分配的任務列表。
開始:
1. synchronized(jobQueue);∥同步作業(yè)隊列
2. calAvgTime(jobQueue);∥計算平均時間
3. calSpareTime(jobQueue);∥計算空閑時間
4. killJobByDeadline(jobQueue);
5. killJobBySpareTime(jobQueue);
6. sortBySpareTime(jobQueue);∥排序
7. synchronized (t);∥同步t
8. generate(freeMapSlotsList,t);/*獲取t中空閑Map插槽*/
9. generate(freeReduceSlotsList,t);/*空閑 Reduce插槽*/
10. FORjobin jobQueue DO
11. IF numFreeMapSlots==0THEN
12. BREAK
13. END IF
14. IF job.getNumfinishedTasks()+job.get-NumRunningTasks()==0THEN
15. assignMapTask(job.getUnassignedMapTasks(),freeMapSlotsList,1);
16. assignedTasks.append(Maps);
17. END IF
18. END FOR
19. FORjobin jobQueue DO
20. IF numFreeMapSlots==0THEN
21. BREAK
22. END IF
23. assignMapTask(job.getUnassignedMapTasks(),freeMapSlotsList,numFreeMapSlots);
24. assignedTasks.append(Maps);
25. END FOR
26. FORjobin jobQueue DO
27. IF numFreeReduceSlots==0THEN
28. BREAK;
29. END IF
30. assignReduceTask(job.getUnassignedReduceTasks(),freeReduceSlotsList,numFreeReduceSlots);
31. assignedTasks.append(Reduces);
32. END FOR
結束。
本文在Hadoop中實現了LSS調度器。LSS的實現主要包括以下幾個部分:(1)增加作業(yè)屬性:每一個實時任務都有自己的截止時間,需要給作業(yè)添加截止時間屬性;(2)繼承抽象類TaskScheduler:主要工作是修改其中分配任務的方法,LSS按照算法1修改了TaskScheduler中的schedule-Tasks()方法;(3)集群配置:將改寫好的調度器打成Jar包,放在主節(jié)點Hadoop根目錄下的lib文件夾下,同時在配置文件mapred-site.xml中添加屬性“mapred.jobtracker.taskScheduler”,并將其值賦為改寫后的TaskScheduler類“org.apache.hadoop.mapred.LLScheduler”。重啟集群后,集群采用的調度器就為LSS。
本文所使用的集群環(huán)境如表1所示。Hadoop版本為 Hadoop-1.1.0。集群包括六個節(jié)點:一個主節(jié)點和五個從節(jié)點,每個從節(jié)點包括兩個Map插槽和一個Reduce插槽。
Table 1 Experimental configuration表1 實驗環(huán)境
實時系統(tǒng)中常用工作流描述提交的作業(yè),同一工作流中提交同一類型的作業(yè)。工作流用Wi=(λi,Di,Si,Mi,Ri)表示,其中λi表示作業(yè)的到達率,作業(yè)到達時間間隔服從1/λi的泊松分布,零時刻沒有作業(yè)提交,如果λi=1/500,則該工作流中作業(yè)的到達時刻可能為482、1 019、1 527、……,如圖1所示;Di表示作業(yè)的相對截止時間,作業(yè)的截止時間為作業(yè)提交時間加上Di;Si表示作業(yè)處理的輸入數據大??;Mi表示作業(yè)中Map任務的數目;Ri表示作業(yè)中Reduce任務的數目。
ure 1 The possible reach time of jobs whenλi =1/500圖1 λi=1/500時工作流中作業(yè)可能到達的時刻
本文設計了兩組實驗,一組為輕負載,一組為重負載。每一組實驗有三個工作流,并分別在默認調度器和LSS上做了實驗。實驗1為輕負載,該實驗中的Di為其作業(yè)平均到達時間間隔,即Di=1/λi;實驗2為重負載,該實驗中的Di小于其作業(yè)平均到達時間間隔,即Di<1/λi。實驗方案如表2所示。WordCount為Hadoop的基準測試用例,本文中的六個工作流選擇的作業(yè)均是WordCount。
Table 2 Experimental scheme表2 實驗方案
圖2a和圖2b分別為實驗1中默認調度器和LSS中不同工作流的資源占用情況。從圖中可以看出,LSS調度器中工作流對資源的搶占比FIFO激烈。這是因為LSS動態(tài)更新了作業(yè)隊列中作業(yè)的優(yōu)先級順序,而FIFO調度器中作業(yè)的優(yōu)先級順序是一直不變的。在LSS中,雖然作業(yè)之間會相互搶占,但搶占消耗的主要是JobTracker上的資源,對從節(jié)點的影響不大,因此作業(yè)隊列中作業(yè)優(yōu)先級順序的動態(tài)更新對作業(yè)在從節(jié)點中的執(zhí)行影響不大。
Figure 2 Occupied slots of the default scheduler under experiment 12 實驗1默認調度器中不同工作流的資源占用情況
圖3 為實驗1和實驗2中默認調度器和LSS調度器的成功率。從圖中可以看出,不管是在輕負載還是在重負載的情況下,LSS調度器的成功率均高于默認調度器,說明LSS能更好地處理硬實時作業(yè)。因為LSS實時更新了作業(yè)隊列中的作業(yè)的優(yōu)先級順序,空閑時間小的作業(yè)的執(zhí)行得到了保證;而且LSS提前將不能成功執(zhí)行的作業(yè)結束了,給其他作業(yè)留出了更多的資源,保證了其他作業(yè)成功執(zhí)行的可能性。但是,默認調度器只采用了先進先出的設計思想,沒有對實時作業(yè)做任何優(yōu)化。因此,在兩種情況下,LSS調度器的成功率均高于默認調度器。
Figure 3 Success ratios of experiment 1and experiment 2圖3 實驗1和實驗2中的成功率
圖4 a和圖4b分別為實驗1、實驗2中不同工作流中作業(yè)的平均響應時間。從圖中可以看出,工作流1中默認調度器的效果比LSS的好。這是因為LSS傾向于保障時限要求更高的工作流的執(zhí)行,當優(yōu)先級更高的工作流2和工作流3有計算需求時,LLS將中斷工作流1的作業(yè)的執(zhí)行,從而延長了工作流1的平均響應時間。而默認調度器上作業(yè)的優(yōu)先級順序不會變化,作業(yè)一旦開始執(zhí)行,則會一直執(zhí)行直到結束,作業(yè)的執(zhí)行不會被中斷,從而使得具有更多空閑時間工作流中作業(yè)的平均響應時間低于LSS調度器。
Figure 4 Average response time圖4 作業(yè)的平均響應時間
實驗結果表明,不管是輕負載還是重負載,LSS調度器中作業(yè)的成功率均高于默認調度器,但是在作業(yè)的平均響應時間方面,時限要求較低的工作流中LSS調度器的效果會低于Hadoop的默認調度器。而硬實時調度器的首要設計目標是提高時限作業(yè)的成功執(zhí)行率,因此在處理硬實時作業(yè)時,LSS調度器的性能得到了提升。
針對Hadoop現有調度器在處理硬實時作業(yè)上的不足,本文設計和實現了一種基于空閑時間的調度器LSS。為了提高集群在處理硬實時作業(yè)時的整體成功率,減輕集群的負載,LSS設計過程中采用了四個機制:(1)用空閑時間描述作業(yè)的優(yōu)先級;(2)動態(tài)預測作業(yè)的空閑時間,并實時更新預測結果;(3)實時更新作業(yè)隊列中作業(yè)的優(yōu)先級順序;(4)根據空閑時間的預測結果,提前結束不能成功執(zhí)行的作業(yè)。實驗結果表明,LSS能有效提高時限作業(yè)的成功執(zhí)行率。預測空閑時間時,本文考慮的是同構集群,因此如何預測異構集群上的空閑時間將是本文的下一步研究工作。
[1] Dean J,Ghemawat S.MapReduce:Simplified data processing on large clusters[J].Communications of the ACM,2008,51(1):107-113.
[2] Lee K,Lee Y,Choi H,et al.Parallel data processing with MapReduce:A survey[J].ACM SIGMOD Record,2012,40(4):11-20.
[3] Li Rui,Wang Bin.MapReduce in text processing[J].Journal of Chinese Information Process,2012,26(4):9-20.(in Chinese)
[4] Kc K,Anyanwu K.Scheduling Hadoop jobs to meet deadlines[C]∥Proc of the 2nd IEEE International Conference on Cloud Computing Technology and Science,2010:388-392.
[5] Ma Xiao-yan,Hong Jue.A mutil-resource fair scheduler of Hadoop[J].Journal of Integration Technology,2012,1(3):66-71.(in Chinese)
[6] Zaharia M,Borthakur D,Sen Sarma J,et al.Delay scheduling:A simple technique for achieving locality and fairness in cluster scheduling[C]∥Proc of the 5th European Conference on Computer Systems,2010:265-278.
[7] Guo Z,Fox G.Improving MapReduce performance in heterogeneous network environments and resource utilization[C]∥Proc of the 12th IEEE/ACM International Symposium on Cluster,Cloud and Grid Computing,2012:714-716.
[8] Xu Cheng,Liu Hong,Tan Liang.New mechanism of monitoring on Hadoop cloud platform[J].Computer Science,2013,40(1):112-117.(in Chinese)
[9] Li Xin,Zhang Peng.Improvement and implementation of fair scheduling algorithms in Hadoop cluster[J].Computer Knowledge and Technology,2012(1):166-168.(in Chinese)
[10] Wei Xiao-hui,Fu Qing-wu,Li Hong-liang.Resource forecast delay algorithm for Hadoop systems[J].Journal of Jilin University,2013(1):101-106.(in Chinese)
[11] Ning Wen-yu,Wu Qing-bo,Tan Yu-song.MapReduce oriented self-adaptive delay scheduling algorithm[J].Computer Engineering &Science,2013,35(3):52-57.(in Chinese)
[12] Dong X,Wang Y,Liao H.Scheduling mixed real-time and non-real-time applications in MapReduce environment[C]∥Proc of the 17th IEEE International Conference on Parallel and Distributed Systems,2011:9-16.
[13] Lin Qing-ying.Cloud computing model on Hadoop[J].Modern Computer,2010(7):114-116.(in Chinese)
附中文參考文獻:
[3] 李銳,王斌.文本處理中的 MapReduce技術[J].中文信息學報,2012,26(4):9-20.
[5] 馬肖燕,洪爵.多資源公平調度器在Hadoop中的實現[J].集成技術,2012,1(3):66-71.
[8] 許丞,劉洪,譚良.Hadoop云平臺的一種新的任務調度和監(jiān)控機制[J].計算機科學,2013,40(1):112-117.
[9] 李鑫,張鵬.Hadoop集群公平調度算法的改進與實現[J].電腦知識與技術,2012(1):166-168.
[10] 魏曉輝,付慶午,李洪亮.Hadoop平臺下基于資源預測的Delay調度算法[J].吉林大學學報:理學版,2013(1):101-106.
[11] 寧文瑜,吳慶波,譚郁松.面向MapReduce的自適應延遲調度算法[J].計算機工程與科學,2013,35(3):52-57.
[13] 林清瀅.基于 Hadoop的云計算模型[J].現代計算機,2010(7):114-116.