劉 備,譚新明,曹文彬
(武漢理工大學(xué) 計算機科學(xué)與技術(shù)學(xué)院,武漢 430063)
Spark Streaming動態(tài)資源分配策略
劉 備*,譚新明,曹文彬
(武漢理工大學(xué) 計算機科學(xué)與技術(shù)學(xué)院,武漢 430063)
(*通信作者電子郵箱liubei1203@163.com)
針對Spark Streaming作為混合大數(shù)據(jù)計算平臺流處理組件時資源調(diào)整周期長和不能滿足多應(yīng)用多用戶個性化需求的問題,提出了一種多應(yīng)用下動態(tài)資源分配策略(DRAM)。該策略增加了應(yīng)用全局變量來控制動態(tài)資源分配過程。首先,獲取歷史執(zhí)行數(shù)據(jù)反饋和應(yīng)用全局變量;然后,進(jìn)行資源增減計算;最后,進(jìn)行資源增減執(zhí)行。實驗結(jié)果表明,所提策略能夠有效調(diào)整應(yīng)用資源配額,且在穩(wěn)定數(shù)據(jù)流和不穩(wěn)定數(shù)據(jù)流兩種情況下,其處理延時相比原Spark平臺的Streaming策略和Core策略都有所降低;同時該策略也能夠提高集群資源利用率。
Spark;實時數(shù)據(jù)流;多應(yīng)用;動態(tài)資源分配
隨著針對數(shù)據(jù)流的研究,大規(guī)模動態(tài)數(shù)據(jù)集(也稱為實時數(shù)據(jù)流)成為研究及工程人員爭相探索的熱點領(lǐng)域。大數(shù)據(jù)時代持續(xù)性流數(shù)據(jù)呈指數(shù)型增長,讓實時數(shù)據(jù)流處理受到很大的關(guān)注。Spark[1]作為一個高效的分布式計算系統(tǒng)和頂級的內(nèi)存計算技術(shù),其組件Spark Streaming[2]對于可靠的、高吞吐量和低延遲的流處理有著很好的支持。
目前Spark應(yīng)用資源分配默認(rèn)采取預(yù)分配的方式,資源量在程序提交時已經(jīng)確定直到查詢或者計算退出。當(dāng)多個應(yīng)用共享一個Spark集群時,集群資源總量是有限的,即多個應(yīng)用的資源總量固定。而流處理中數(shù)據(jù)量往往表現(xiàn)出動態(tài)變化性,用戶查詢也具有隨機性的特征,這樣一個Spark Streaming應(yīng)用需要的計算資源并不是恒定不變的,某個時間段可能需要的計算單元會陡然增加或者減少,靜態(tài)配置應(yīng)用程序資源不能滿足資源合理利用需求,多用戶下同一時間段不同應(yīng)用程序有不同大小的計算任務(wù),對資源的迫切程度也不一樣。對資源動態(tài)分配既可以保證流處理的實時性和資源合理分配,又能夠滿足用戶個性化需求。雖然Spark Core和Spark Streaming均提供動態(tài)資源分配機制來實現(xiàn)動態(tài)增減應(yīng)用計算資源,但是現(xiàn)有的兩種策略并不能保證處理的實時性,也不具備多應(yīng)用的特征。分析結(jié)果表明,對于不穩(wěn)定的輸入流,流處理延遲時間呈現(xiàn)很大的波動性。
為了降低多用戶下流處理延時,提高集群資源利用率,本文提出一種基于Spark Streaming流處理的多應(yīng)用下動態(tài)資源分配策略(Dynamic Resource Allocation strategy of Multi-application, DRAM),以減少數(shù)據(jù)流頻繁波動情況下處理延時和資源動態(tài)分配時間,并比較兩種流處理場景下該方法的可行性和性能。
企業(yè)級數(shù)據(jù)平臺往往需要滿足多種應(yīng)用場景如流處理業(yè)務(wù)場景、海量批處理、迭代計算、圖計算等。在一個項目中同時滿足多種業(yè)務(wù)需求,需要使用多套特化系統(tǒng)。一方面在各種不同系統(tǒng)之間避免不了要進(jìn)行數(shù)據(jù)轉(zhuǎn)儲(Extract Transform Load, ETL), 這無疑將增加系統(tǒng)的復(fù)雜程度和負(fù)擔(dān);另一方面使用多套系統(tǒng)也增加了使用和維護(hù)的難度,使用Spark系統(tǒng)則可以適用于目前常用的各種大數(shù)據(jù)計算模式[3]。同時Spark在Yarn模式下的運行提供的資源隔離和資源彈性管理以及對傳統(tǒng)批處理系統(tǒng)中文件存儲的支持也方便企業(yè)對于歷史數(shù)據(jù)的利用。故構(gòu)建多用戶下軟件即服務(wù)(Software as a Service, SaaS)[4]模式平臺計算中心采用Spark平臺比較適用。
計算平臺中常用的數(shù)據(jù)流可以來自股票市場的時序分析、企業(yè)交易、各種交互事件、Web流量、點擊流和傳感器數(shù)據(jù)等,都是即時且?guī)в袝r間戳的數(shù)據(jù)[5]。這些數(shù)據(jù)流需要及時處理以便于監(jiān)測,例如異常檢測、異常奇點、垃圾郵件、欺詐和入侵; 也可提供基礎(chǔ)的統(tǒng)計、計算和推薦。某些情形,總結(jié)性的匯總數(shù)據(jù)需要存儲以備將來使用。計算平臺中的數(shù)據(jù)流具有海量性、實時性和動態(tài)變化性的特點,所以數(shù)據(jù)平臺的處理任務(wù)大小也具備動態(tài)變化特征,同樣企業(yè)中對于數(shù)據(jù)流計算的查詢也是動態(tài)變化的。
為保證數(shù)據(jù)處理實時性,資源需要動態(tài)變化,這樣一方面提高了資源利用率,另一方面提高了實時性。已有的對于Spark Streaming實時性的改進(jìn)更多的是對于微批處理大小、時間間隔長短等方面進(jìn)行改進(jìn)來保證輸入流的穩(wěn)定性。如文獻(xiàn)[6]中研究微批處理大小對流處理的影響,從歷史數(shù)據(jù)中得到信息修改batch間隔來保證輸入流的平穩(wěn),但是對于周期性波動流數(shù)據(jù)依然有很大延遲。文獻(xiàn)[7]中根據(jù)半個batch 周期事件的平均值來控制生成任務(wù)數(shù)量,保證輸入流平穩(wěn),但是缺少與Spark平臺動態(tài)資源分配機制的結(jié)合。大數(shù)據(jù)計算中往往可以通過直接對資源動態(tài)分配來保證資源有效利用。文獻(xiàn)[8]對云計算下虛擬機資源動態(tài)分配進(jìn)行了研究。文獻(xiàn)[9]對異構(gòu)Hadoop計算資源動態(tài)分配進(jìn)行了研究。這些對資源動態(tài)分配的研究大多集中在Hadoop等平臺,目前 Spark平臺中除了在Spark Core和Spark Streaming提供相應(yīng)的動態(tài)資源分配機制外,并未見到對動態(tài)資源分配機制的研究。
2.1 Spark Core運行過程分析
Spark默認(rèn)采用的是資源預(yù)分配和粗粒度的方式分配資源。所謂資源單位一般指的是Executor,Executor是某Application運行在Worker Node上的一個進(jìn)程,該進(jìn)程維持線程池運行Task,并且負(fù)責(zé)將數(shù)據(jù)存在內(nèi)存或者磁盤上。每個Application提交運行時將申請獲取一組獨立運行該Application中Task的Executor資源。直到運行結(jié)束,該程序?qū)⒁恢背钟匈Y源。提交程序時,通常使用num-executors來指定Application使用的Executor數(shù)量,而executor-memory和executor-cores分別用來指定每個Executor所使用的內(nèi)存和虛擬內(nèi)核數(shù)。Spark基本運行過程如圖1所示。
Spark應(yīng)用從用戶提交應(yīng)用到應(yīng)用運行結(jié)束需要經(jīng)過如下幾個步驟:
a)用戶啟動客戶端,向集群提交用戶程序。
b)Resource Manager遍歷可用Woker節(jié)點,隨機選取一個符合SparkAppMaster資源要求的Worker作為新建SparkAppMaster的節(jié)點,并向Worker中NodeManager發(fā)送新建Executor請求。
c)NodeManager在選中的Worker節(jié)點上新建Executor用來運行SparkAppMaster,并在SparkAppMaster中創(chuàng)建運行環(huán)境SparkContext[1](啟動Driver進(jìn)程,創(chuàng)建SparkContext及DAGScheduler、TaskSchduler等)。
d)SparkAppMaster向ResourceManager發(fā)送申請Executor資源的請求。
e)ResourceManager分配Executor給SparkAppMaster,SparkAppMaster和相關(guān)的NodeManager通信,NodeManager向SparkAppMaster中的SparkContext注冊并等待分配Task。
f)SparkContext將Applicaiton代碼序列化發(fā)送給Executors,并且SparkContext解析代碼構(gòu)建運行邏輯的有向無環(huán)圖(Directed Acyclic Graph, DAG),并提交給DAGScheduler分解成Stage(Action操作時,就會生成Job或Job集合;每個Job中含有1個或多個Stage,Stage產(chǎn)生在RDD寬依賴之間),然后將Stage中TaskSet提交給TaskScheduler,TaskScheduler負(fù)責(zé)將Task分配到應(yīng)用中滿足條件的Executor上執(zhí)行;Executor執(zhí)行Task并向SparkAppMaster中的SparkContext 匯報運行狀況;Task運行完畢,SparkContext歸還資源給ResourceManager,并注銷退出。
圖1 Spark運行流程
Spark Streaming應(yīng)用運行時會調(diào)用Spark Core中步驟a)~e)新建Spark AppMaster并獲取應(yīng)用所需資源,然后執(zhí)行如下步驟:
1)Spark AppMaster選取一個或多個節(jié)點新建Executor。
2)Node Manager將新建的Executor作為數(shù)據(jù)接收器Receiver,數(shù)據(jù)流由Receiver處理。
3)Receiver將接收的數(shù)據(jù)按照應(yīng)用時間間隔存儲為batch或數(shù)據(jù)塊并將數(shù)據(jù)塊信息返回給Spark AppMaster。
4)Spark AppMaster按照Spark Core中步驟f)中劃分為Task集合,Spark AppMaster將Receiver中batch信息和Task集合發(fā)送給各個節(jié)點中Executor。
5)各節(jié)點中Executor根據(jù)數(shù)據(jù)塊位置進(jìn)行Task執(zhí)行并返回處理結(jié)果。
2.2 Spark Core動態(tài)資源分配分析
Spark 1.2版本開始在Spark-on-Yarn模式下提供動態(tài)資源分配機制。通過應(yīng)用程序設(shè)置dynamicAllocation.enabled=true開啟,同時可設(shè)置的可用屬性有:最小分配Executor數(shù)minEs、最大分配Executor數(shù)maxEs、資源過期時間eit、執(zhí)行等待時間sbto、申請資源時間間隔sto。
基本思想是:Application在Job中Task因沒有足夠資源被掛起的時候去動態(tài)申請資源,這種情況意味著現(xiàn)有的Executor數(shù)不能滿足所有Task并行運行,所以需要向集群資源管理器申請更多資源,每隔一段時間申請一次,一直到申請足夠的資源。當(dāng)Application中分配到的Executor掛起或者等待Task超過過期時間(默認(rèn)為1s)的時候,集群資源管理器會釋放該Executor資源。
在算法1中,系統(tǒng)通過監(jiān)聽器監(jiān)聽集群中Executor狀態(tài)信息和任務(wù)執(zhí)行信息,新提交的Task集合需求的資源大于現(xiàn)有資源時,系統(tǒng)記錄開始本次資源動態(tài)分配的時間at,并會根據(jù)當(dāng)前任務(wù)需要的最大的Executor數(shù)numN和當(dāng)前已經(jīng)有的Excutor數(shù)numT比較來增加和刪除程序資源,起始當(dāng)numN 算法1Sparkcore動態(tài)資源分配。 輸入Stage,最小/大資源量minEs/maxEs,開始添加資源時間at; 輸出 資源增減。 1) 根據(jù)Stage生成Task和每個任務(wù)需要資源量,得到numN; 2) 獲取當(dāng)前已獲得資源量numT,當(dāng)前時間ct,需要增加的資源數(shù)numEA以及at是否設(shè)置; 3) if(numN≤numT)then 4) at←ct;numEA←1; 5) numT←max(numN,minNum) 6) elseif(ct≥atandat==not set )then 7) addExecutor(numN); 8) end if 9) end if 10) addExecutor()中根據(jù)numN、ct、at和sto計算numEA,增加資源到numN; 11) removeExecutor()對放入刪除列表的資源經(jīng)過sbo時間執(zhí)行刪除。 3.1SparkStreaming運行機制分析 SparkStreaming作為Spark計算平臺的組件之一,充分利用了Spark的核心架構(gòu)。同時作為流處理功能的入口點StreamingContext[2],它構(gòu)建在SparkContext[1]之上。集群管理器將至少單獨分片一個工作節(jié)點作為接收器,這是一個長時間運行的任務(wù)執(zhí)行器來處理進(jìn)入的流數(shù)據(jù)。執(zhí)行器創(chuàng)建DiscretizedStreams[10]或者從輸入數(shù)據(jù)流中得來的一組彈性分布式數(shù)據(jù)(ResilientDistributedDataset,RDD)[11]集合DStreams,DStream默認(rèn)為另一個WorkerNode的緩存。接收器服務(wù)于輸入數(shù)據(jù)流,多個接收器提升了并行性,產(chǎn)生多個DStreams,SparkStreaming用它操作RDD。流處理程序中Application的action操作生成Job集合提交給Spark內(nèi)核,每個Job生成Task集合給Executor進(jìn)程執(zhí)行。 如圖2所示,SparkAppMaster將接收器Receiver作為一個Task提交給一個Executor,Receiver啟動會按照時間間隔batchinterval讀入時間長度為batchDuration的流數(shù)據(jù),生成數(shù)據(jù)塊block;Job生成模塊JobGenerator根據(jù)batch生成相應(yīng)的Job,Job提交給Job執(zhí)行模塊JobPocessor,JobPocessor在集群中尋找空閑Executor執(zhí)行Job中Task集合。 圖2 Streaming運行流程 SparkStreaming通過Receiver以生產(chǎn)者生產(chǎn)數(shù)據(jù)的速率接收數(shù)據(jù),計算過程中會出現(xiàn)batchprocessingtime>batchinterval的情況,其中:batchprocessingtime為實際計算一個批次花費時間,batchinterval為Streaming應(yīng)用設(shè)置的批處理間隔。這意味著SparkStreaming的數(shù)據(jù)接收速率高于Spark從隊列中移除數(shù)據(jù)的速率,也就是數(shù)據(jù)處理能力低,在設(shè)置間隔內(nèi)不能完全處理當(dāng)前接收速率接收的數(shù)據(jù)。如果這種情況持續(xù)過長的時間,會造成數(shù)據(jù)在內(nèi)存中堆積,導(dǎo)致Receiver所在Executor內(nèi)存溢出等問題。 流處理中每個時間切片batchDuration中數(shù)據(jù)量大小差別很大,導(dǎo)致要求的資源差別很大,如果幾個周期內(nèi)還沒調(diào)整完資源,可能導(dǎo)致任務(wù)掛起或者執(zhí)行延時較大,所以SparkCore中動態(tài)資源分配算法在流處理中并不適用。 3.2SparkStreaming動態(tài)資源分配分析 SparkStreaming組件將數(shù)據(jù)輸入流拆分為一個個batch去處理,batch中每個記錄處理的總時間都不一樣。一個batch中早到達(dá)的記錄會有一個長的延遲時間。假設(shè)時間切片為Tw即batch時間長度,最壞的情況下一個batch中記錄的總執(zhí)行時間為Tt=Tw+Te,Te代表記錄e總延遲時間,而在SparkStreaming中Te分為事件調(diào)度時間Ts和事件運行時間Tp,故Tt=Tw+Ts+Tp。文獻(xiàn)[7]中通過對比實驗得出,當(dāng)batch間隔太小時即Tw太小時會導(dǎo)致batch中數(shù)據(jù)量太小產(chǎn)生的Job數(shù)量會陡然增加,從而導(dǎo)致大部分小Job在等待分配資源隊列中,Ts時間幾乎可以代表總執(zhí)行時間Tt,當(dāng)獲取到資源之后當(dāng)Executer不足時會延長Tp時間,所以Tp時間長短代表資源充足與否。 Spark1.5版本推出的SparkStreaming動態(tài)資源分配機制獲取前一個batch的Tp時間來判斷是否需要增減資源。即算法2中從監(jiān)聽器獲取前一batch總運行時間Tp和事件總數(shù)bpc,同時從StreamingContext中獲取參數(shù)上延時比例sur和下延時比例sdr。比較事件平均處理時間Tavg=Tp/bpc和時間窗口Duration,得到兩者比例Ratios,Ratios≥sur則請求資源,請求資源數(shù)為max(round(Ratios),1),Ratios≤sdr則減少資源,減少資源時需要確認(rèn)Executor不存在Receiver。為了保證之前的資源調(diào)整完畢,每隔sis時間調(diào)整一次,sis由應(yīng)用程序設(shè)置。 算法2SparkStreaming動態(tài)資源分配。 輸入 最小/大資源量minEs/maxEs,batch周期時間bd,上/下延時比例sur/sdr,動態(tài)管理時間間隔sis。 輸出 資源增減。 1) 獲取執(zhí)行時間總量Tp和微批處理總個數(shù)bpc,計算平均批處理執(zhí)行時間t1。 2) Ratios=t1/bd; 3) ifRatios≥surthen 4) NumEA←max(round(t1),1); 5) RequestExecutor(NumEA); 6) elseifRatios≤sdrthen 7) KillExecutor(); 8) end if 9) end if 3.3 多應(yīng)用下的動態(tài)資源分配策略 Spark Core中動態(tài)資源策略在面對Spark Streaming流處理中batch前后差距很大的情況時,需要幾個周期去資源調(diào)整,所以流處理中并不適用?,F(xiàn)有的Spark Streaming中動態(tài)資源分配算法僅僅考慮了前一batch的執(zhí)行時間,當(dāng)數(shù)據(jù)流中事件呈現(xiàn)周期性波動性很大時,會造成系統(tǒng)頻繁地去增減資源造成系統(tǒng)抖動,同樣資源調(diào)整需要花費較多的周期數(shù),造成Tp時間延遲,所以更多的時候需要結(jié)合控制輸入流流入速率機制來保證輸入流的穩(wěn)定性以達(dá)到要求。多用戶多應(yīng)用提交應(yīng)用程序到集群處理時,對任務(wù)處理實時性期望不一樣,數(shù)據(jù)流量也會不同,當(dāng)需要增減資源配額時,高優(yōu)先級用戶可能需要下一個周期就能滿足資源需求,而低優(yōu)先級用戶則可以通過幾個周期來調(diào)整。多用戶的需求體現(xiàn)在數(shù)據(jù)的穩(wěn)定性和資源需求的迫切性,所以需要增加流處理應(yīng)用程序參數(shù)來區(qū)分各個應(yīng)用的調(diào)整比例和最小資源保有情況,實現(xiàn)多應(yīng)用下的動態(tài)資源分配策略(DRAM)。 3.3.1 動態(tài)資源分配模型 根據(jù)SparkStreaming應(yīng)用運行機制和現(xiàn)有SparkStreaming動態(tài)資源分配機制分析得到SparkStreaming動態(tài)資源分配模型,如圖3所示。 圖3 動態(tài)資源分配模型 從圖3中可以看出,數(shù)據(jù)流經(jīng)過batch生成模塊BatchGenerator拆分為一個個batch,提交給Job管理器JobGenerator;JobGenerator結(jié)合Application中全局參數(shù)和任務(wù)處理步驟生成對RDD的操作即Job集合,然后提交到集群中由JobProcessor運行;監(jiān)視器器周期性從JobProcessor中獲取歷史Job數(shù)據(jù);動態(tài)管理器AllocationController獲得監(jiān)聽器歷史數(shù)據(jù)反饋并結(jié)合Application環(huán)境變量,計算資源調(diào)整結(jié)果;AllocationController將資源增減結(jié)果發(fā)送給StreamingContext,StreamingContext向集群管理器提交增減任務(wù)資源請求;集群管理器增減Job資源量并發(fā)送給Job執(zhí)行器。所以動態(tài)資源分配模型可分解為獲取歷史數(shù)據(jù)和應(yīng)用程序全局變量、資源增減計算和資源增減執(zhí)行三個部分。 3.3.2 歷史數(shù)據(jù)和全局變量獲取 在開始動態(tài)資源分配前,需要獲得輸入?yún)?shù)。針對多個用戶任務(wù)的實時性要求不同,在全局變量中添加了任務(wù)重要比例ar、參考batch數(shù)rbs、資源減少周期數(shù)rr、減少資源時保有比例rra;同時設(shè)置上/下延時比例sur/sdr、動態(tài)調(diào)整時間sis和最小/大資源量minEs/maxEs。歷史數(shù)據(jù)需要獲取任務(wù)已有資源量ctn、Job Processor中前rbs個batch中事件平均執(zhí)行時間t1i(i=1,2,…,rbs)、batch切片時間bd。全局變量中,ar代表增加資源坡度,區(qū)間為[0,1],默認(rèn)值為1;減少資源方面,rr表示資源減少時需要在rr個sis時間內(nèi)減少資源,默認(rèn)值為1。為了防止資源頻繁增減造成系統(tǒng)震蕩,設(shè)置了最終需要保有的比例值rra。 3.3.3 資源增減計算 當(dāng)開啟動態(tài)資源分配并獲得歷史數(shù)據(jù)和全局變量后,AllocationController首先要判斷是否需要增減資源。這里需要參考rbs個batch中事件平均執(zhí)行時間t1i計算動態(tài)資源分配所參考的batch中事件平均執(zhí)行時間T1,并計算它與batchDuration的比例Ratios: (1) Ratios=T1/bd (2) 當(dāng)Ratios處于區(qū)間[sdr,sur]時,動態(tài)管理器不進(jìn)行資源調(diào)整,直接進(jìn)入下一個sis周期。當(dāng)Ratios≥sur時,表明處理時間大于batch切片間隔,則任務(wù)現(xiàn)有資源不能滿足處理需求,需要給任務(wù)添加資源,添加的資源量為: NumEA=(maxEs-ctn)*ar (3) 當(dāng)Ratios≥sdr時說明當(dāng)前用戶任務(wù)資源過剩,需要減少任務(wù)資源避免資源浪費,這里需要分多個周期來減少并能保有一定比例來防止系統(tǒng)震蕩,總的要減少的資源量為: NumPr=round(ctn*((bd-T1)/bd-rra)) (4) 當(dāng)前周期內(nèi)實際需要減少的資源量為: NumArn=NumPr/bd (5) 3.3.4 資源增減執(zhí)行 應(yīng)用程序設(shè)置參數(shù)sis來保證資源調(diào)整時間的,sis默認(rèn)60s。當(dāng)AllocationController向StreamingContext提交資源調(diào)整請求后,StreamingContext將請求發(fā)送給集群管理器來增減任務(wù)資源。增加資源后,資源量不高于maxEs,當(dāng)空閑資源不足時會等待;減少資源時首先需要確定資源中沒有Receiver,減少的資源Executor將不會再分配任務(wù),并交給集群管理器去異步減少。 算法3 多應(yīng)用動態(tài)資源分配算法。 輸入 最小資源量minEs,最大資源量maxEs,任務(wù)重要比例ar,資源減少周期數(shù)rr,保有比例rra,參考周期數(shù)rbs,batch周期時間bd,上/下延時比例sur/sdr; 輸出 資源增減。 1) 獲取當(dāng)前資源量ctn,得出前rbs個周期平均執(zhí)行時間T1; 2) Ratios=T1/bd; 3) ifRatios≥surthen 4) NumEA←(maxNum-ctn)*ar; 5) RequestExcutor(NumEA); 6) elseifRatios≤sdrthen 7) numPr←round(ctn*((bd-t1)/bd-rra)); 8) arn=NumPr/rr; 9) KillExecutor(arn); 10) end if 11) end if 4.1 測試環(huán)境配置 實驗中采用8臺虛擬機模擬物理機器搭建Spark集群,集群配置情況總的有28個CPU核數(shù)、24 GB內(nèi)存,CPU主頻為2.20 GHz,Linux版本是32 bit Ubuntu14.04,Spark版本是1.6.1,集群中有一臺服務(wù)器作為Master,其余七臺作為Slave,每個應(yīng)用程序能請求到的最大的CPU核數(shù)為4,集群運行模式為Spark on Yarn,集群中最多允許提交的任務(wù)數(shù)為 4。 實驗程序選取Spark中常用的應(yīng)用實例作為應(yīng)用程序提交:第一個是WordCount,用來統(tǒng)計數(shù)據(jù)流中單詞出現(xiàn)的次數(shù),每一次微批處理相當(dāng)于一組接收自網(wǎng)絡(luò)流接口的單詞組。第二個是Grep,應(yīng)用于輸入數(shù)據(jù)流中匹配目標(biāo)字符串并計算字符串出現(xiàn)的個數(shù)。為了測試多種數(shù)據(jù)下的性能表現(xiàn),選取不同的數(shù)據(jù)流類型模擬輸入流,數(shù)據(jù)流類型分別為平滑的數(shù)據(jù)流、不平滑的數(shù)據(jù)流。平滑的輸入流大部分時間都是穩(wěn)定的,但為了測試資源動態(tài)分配導(dǎo)致的執(zhí)行時間變化也設(shè)置了峰值,平滑數(shù)據(jù)流batch間隔為1 s,batch中event個數(shù)在2 000上下浮動500個event;不平滑的輸入流則設(shè)置了周期性大量變化的時間以對系統(tǒng)進(jìn)行性能測試,每隔50個batch會有一次尖銳的峰值,batch中event數(shù)量增加到5 000以上,然后event數(shù)量穩(wěn)定到1 300。Spark Streaming利用Spark內(nèi)核去執(zhí)行流處理任務(wù),而Spark的優(yōu)勢在于利用內(nèi)存來緩存中間結(jié)果以及儲存,所以Spark集群中內(nèi)存利用率通常會很高,選取內(nèi)存利用率作為資源利用率的參考不具備代表性。所以這里比較三種策略在相同條件下batch總執(zhí)行時間和CPU利用率的情況。實驗結(jié)果通過查看日志文件得到。實驗結(jié)果中,Core代表Spark Core動態(tài)資源分配策略下實驗情況,Streaming表示Spark Streaming動態(tài)資源動態(tài)分配策略下實驗情況,DRAM即為多應(yīng)用動態(tài)資源分配策略下實驗結(jié)果。 4.2 結(jié)果分析 4.2.1 穩(wěn)定數(shù)據(jù)流分析 將Grep和WordCount分別提交到Spark集群中,分別設(shè)置在Spark Core動態(tài)資源分配策略、Spark Streaming動態(tài)資源動態(tài)分配策略和多應(yīng)用動態(tài)資源分配策略(DRAM)下運行。Spark Core動態(tài)資源分配策略和Spark Streaming動態(tài)資源動態(tài)分配中程序參數(shù)設(shè)置均設(shè)置為系統(tǒng)默認(rèn)值,多用戶動態(tài)資源分配策略中任務(wù)重要比例均設(shè)置為0.5,參考周期設(shè)置為1,資源保有比例設(shè)置為0.2,上下延時比例設(shè)置為0.9/0.3。實驗中考慮到影響集群運行的因素有很多,為了保證實驗代表性,所以在相同的穩(wěn)定輸入流下運行了500個batch,經(jīng)過反復(fù)實驗后發(fā)現(xiàn)系統(tǒng)前50個batch需要經(jīng)歷初始化過程,流處理延時波動會比較大,故丟棄前50個batch,取系統(tǒng)穩(wěn)定階段平均處理時間。統(tǒng)計了三種策略下平均任務(wù)完成時間如圖4所示。 圖4 穩(wěn)定數(shù)據(jù)流下Grep和WordCount平均執(zhí)行時間 從圖4中可以看到,Grep應(yīng)用batch平均處理時間方面, DRAM比Streaming策略降低了接近15%,同樣,DRAM比Core策略降低了17%。在使用DRAM的Spark Streaming平臺中WordCount應(yīng)用batch處理時間比使用Core和Streaming兩種策略的batch處理時間也有相應(yīng)的降低。 4.2.2 不穩(wěn)定數(shù)據(jù)流分析 動態(tài)資源分配策略使用場景為:當(dāng)數(shù)據(jù)流中數(shù)據(jù)量增長時,實時增加程序完成任務(wù)缺少的資源,減少任務(wù)等待時間;在數(shù)據(jù)流中處理事件較少時,能夠平滑地減少資源數(shù),做到計算資源合理利用。在不穩(wěn)定數(shù)據(jù)流的測試中,每隔一段時間batch中的處理事件(這里的處理事件數(shù)是指由某一時間段數(shù)據(jù)流生成的處理任務(wù)集)呈現(xiàn)周期性的尖銳峰值增長,應(yīng)用程序設(shè)置與穩(wěn)定數(shù)據(jù)流下一致,程序運行500 batch之后查看日志下某一個batch的執(zhí)行時間并記錄表格。圖5表示W(wǎng)ordCount應(yīng)用程序在三種不同策略下的batch總處理時間(total time)和batch中事件的個數(shù)(event number)。 由圖5可以看出,在有尖銳峰的不穩(wěn)定數(shù)據(jù)流下,DRAM相對于Core和Streaming動態(tài)資源分配策略,其batch總執(zhí)行時間波動較小,且平均時間有所減低,表明DRAM能夠在不穩(wěn)定數(shù)據(jù)流中對任務(wù)資源動態(tài)分配,降低系統(tǒng)延時。 在處理過程中,集群資源的利用率越高代表資源空閑時間越短,集群資源利用越有效。圖6為應(yīng)用程序處理過程中各個節(jié)點平均CPU資源利用率。從圖6中可以看出,DRAM的平均節(jié)點利用率高于另外兩種策略,這說明DRAM集群資源空閑時間更短,資源利用率更高。 針對當(dāng)前Spark計算平臺應(yīng)用于多應(yīng)用SaaS平臺中面臨的數(shù)據(jù)流實時波動情景下,現(xiàn)有動態(tài)資源分配機制響應(yīng)不夠及時并需要結(jié)合控制數(shù)據(jù)速率來提高實時性的問題,本文通過對現(xiàn)有機制進(jìn)行分析,提出動態(tài)資源分配模型,并針對多用戶下多數(shù)據(jù)流變化的特點,根據(jù)系統(tǒng)反饋任務(wù)資源信息變化,增加了任務(wù)重要比例參數(shù)和減少周期、比例的基礎(chǔ)上,實時調(diào)整應(yīng)用程序資源,以更好應(yīng)對突發(fā)計算任務(wù)。實驗結(jié)果表明,本文所提方法能夠有效保證計算任務(wù)的實時執(zhí)行,優(yōu)化了Spark Streaming動態(tài)資源分配,提高了集群資源利用率。未來將繼續(xù)研究測試方法中參數(shù)值的合理化設(shè)置以及動態(tài)資源分配機制與控制流入速率方面的結(jié)合。 圖5 不同策略在不穩(wěn)定數(shù)據(jù)流下執(zhí)行時間 圖6 各節(jié)點平均CPU資源利用率 References) [1] ZAHARIA M, CHOWDHURY M, FRANKLIN M J, et al. Spark: cluster computing with working sets [C/OL]// HotCloud’10: Proceedings of the 2010 2nd USENIX Conference on Hot Topics in Cloud Computing. Berkeley, CA: USENIX Association, 2010. [2016- 10- 25]. https://www.usenix.org/legacy/events/hotcloud10/tech/full_papers/Zaharia.pdf. [2] 夏俊鸞,邵賽賽.Spark Streaming:大規(guī)模流式數(shù)據(jù)處理的新貴[J].程序員,2014(2):44-47.(XIA J L, SHAO S S. Spark Streaming: the upstart of large-scale streaming data processing [J]. Programmer, 2014(2): 44-47.) [3] 胡俊,胡賢德,程家興.基于Spark的大數(shù)據(jù)混合計算模型[J].計算機系統(tǒng)應(yīng)用,2015,24(4):214-218.(HU J, HU X D, CHENG J X. Big data hybrid computing mode based on Spark [J]. Computer Systems & Applications, 2015, 24(4): 214-218.) [4] 王舜燕,黃芬,劉萬春.基于SaaS模式的軟件設(shè)計方法探討[J].計算機與數(shù)字工程,2008,36(10):102-105.(WANG S Y, HUANG F, LIU W C. Software design based on SaaS [J]. Computer & Digital Engineering, 2008, 36(10): 102-105.) [5] 彭宏,劉洋,鄧維維,等.股票數(shù)據(jù)流的相關(guān)性計算方法[J].華南理工大學(xué)學(xué)報(自然科學(xué)版),2006,34(1):86-89.(PENG H, LIU Y, DENG W W, et al. Computing method of correlation of stock data streams [J]. Journal of South China University of Technology (Natural Science Edition), 2006, 34(1): 86-89.) [6] DAS T, ZHONG Y, STOICA I, et al. Adaptive stream processing using dynamic batch sizing [C]// SOCC’14: Proceedings of the 2014 ACM Symposium on Cloud Computing. New York: ACM, 2014: 1-13. [7] LIAO X Y, GAO Z W, JI W X, et al. An enforcement of real time scheduling in Spark Streaming [C]// IGSC’15: Proceedings of the 2015 Sixth International Green and Sustainable Computing Conference. Washington, DC: IEEE Computer Society, 2015: 1-6. [8] 吳杰謙,嚴(yán)然,欒鐘治,等.云計算環(huán)境下資源動態(tài)分配方法研究[C/OL]//2013全國高性能計算學(xué)術(shù)年會論文集.桂林:中國計算機學(xué)會,2013:677-680.[2016- 10- 25].http://www.docin.com/p-1205736858.html.(WU J Q, YAN R, LUAN Z Z, et al. Research on dynamic resource allocation in cloud [C/OL] // Proceedings of the 2013 China High Performance Computing Annual Meeting. Guilin: China Computer Federation, 2013: 677-680. [2016- 10- 25]. http://www.docin.com/p-1205736858.html.) [9] 李鋒剛,魏炎炎,楊龍.基于和聲算法異構(gòu)Hadoop集群資源分配優(yōu)化[J].計算機工程與應(yīng)用,2014,50(9):98-102.(LI F G, WEI Y Y, YANG L. Computing resource optimization in heterogeneous Hadoop cluster based on harmony search algorithm [J]. Computer & Digital Engineering, 2014, 50(9): 98-102.) [10] ZAHARIA M, DAS T, LI H Y, et al. Discretized streams: fault-tolerant streaming computation at scale [C]// SOSP’13: Proceedings of the Twenty-Fourth ACM Symposium on Operating Systems Principles. New York: ACM, 2013: 423-438. [11] KANG W, KAPITANOVA K, SANG H S. RDDS: a real-time data distribution service for cyber-physical systems [J]. IEEE Transactions on Industrial Informatics, 2012, 8(2): 393-405. This work is partially supported by the Key Projects of Hubei Province Natural Science Foundation (2014CFA050). LIU Bei, born in 1993, M. S. candidate. His research interests include big data application, mobile Internet. TAN Xinming, born in 1961, Ph. D., professor. His research interests include software engineering method, Internet of things technology and system. CAO Wenbin, born in 1991, M. S. candidate. His research interests include mobile Internet, processing platform of big data environment. Dynamic resource allocation strategy in Spark Streaming LIU Bei*, TAN Xinming, CAO Wenbin (SchoolofComputerScience&Technology,WuhanUniversityofTechnology,WuhanHubei430063,China) The existing resource allocation strategy has long resource adjustment cycle and cannot sufficiently meet the individual needs of different applications and users when Spark Streaming is selected as stream processing component in hybrid large-scale computing platform. In order to solve the problems, a Dynamic Resource Allocation strategy for Multi-application (DRAM) was proposed. The global variables were added to control the dynamic resource allocation process in DRAM. Firstly, the historical data feedback and the global variables were obtained. Then, whether increasing or decreasing the number of resources in each application was determined. Finally, the increase or decrease of resources was implemented. The experimental results show that, the proposed strategy can effectively adjust the resource quota, and reduce the processing delay compared with the original Spark platform strategies such as Streaming and Core under both cases of the stable data stream and the unstable data stream. The proposed strategy can also improve the utilization rate of the cluster resources. Spark; real-time data stream; multi-application; dynamic resource allocation 2016- 11- 25; 2016- 12- 22。 基金項目:湖北省自然科學(xué)基金重點項目(2014CFA050)。 劉備(1993—),男,湖北仙桃人,碩士研究生,主要研究方向:大數(shù)據(jù)應(yīng)用、移動互聯(lián)網(wǎng); 譚新明(1961—),男,湖北荊州人,教授,博士,主要研究方向:軟件工程方法、物聯(lián)網(wǎng)技術(shù)及系統(tǒng); 曹文彬(1991—),男,河南許昌人,碩士研究生,主要研究方向:移動互聯(lián)網(wǎng)、大數(shù)據(jù)環(huán)境下處理平臺。 1001- 9081(2017)06- 1574- 06 10.11772/j.issn.1001- 9081.2017.06.1574 TP391.1 A3 Spark Streaming動態(tài)資源分配分析
4 實驗結(jié)果與分析
5 結(jié)語