亚洲免费av电影一区二区三区,日韩爱爱视频,51精品视频一区二区三区,91视频爱爱,日韩欧美在线播放视频,中文字幕少妇AV,亚洲电影中文字幕,久久久久亚洲av成人网址,久久综合视频网站,国产在线不卡免费播放

        ?

        分布式環(huán)境中的多作業(yè)執(zhí)行調(diào)度策略與優(yōu)化*

        2021-06-25 09:46:02季航旭趙宇海王國仁
        計算機工程與科學 2021年6期
        關鍵詞:算子集群分布式

        季航旭,姜 蘇,趙宇海,吳 剛,王國仁

        (1.東北大學計算機科學與工程學院,遼寧 沈陽 110819;2.北京理工大學計算機學院,北京 100081)

        1 引言

        隨著信息技術的快速發(fā)展,各個領域積累的數(shù)據(jù)量日漸增多。數(shù)據(jù)量的增加以及數(shù)據(jù)挖掘算法的研究與普及,使得人們越來越重視數(shù)據(jù)中隱含的價值,因此如何快速地從數(shù)據(jù)中獲取有價值的信息成為各個研究領域的關注點。為了應對快速增長的數(shù)據(jù),人們開發(fā)出了一代又一代大數(shù)據(jù)處理系統(tǒng)并產(chǎn)生了大量相關的優(yōu)化技術。目前比較流行的大數(shù)據(jù)處理系統(tǒng)有Hadoop[1]、Storm[2]、Samza[3]、Spark[4,5]和Flink[6]等,它們都采用分布式集群的方式進行平臺的搭建和系統(tǒng)的部署,并有著各自獨特的優(yōu)勢。

        目前,大數(shù)據(jù)計算系統(tǒng)已經(jīng)普及,基于它們的數(shù)據(jù)查詢和數(shù)據(jù)分析等任務也日益復雜化、多樣化,如實時智能推薦、復雜事件處理等。分布式計算系統(tǒng)經(jīng)常面臨的挑戰(zhàn)是資源分配與作業(yè)調(diào)度,這是分布式環(huán)境與生俱來的問題。由于分布式環(huán)境存在計算資源異構、帶寬異構和單個作業(yè)內(nèi)部計算方式復雜等情況,作業(yè)執(zhí)行過程中經(jīng)常出現(xiàn)由于資源分配不合理、調(diào)度優(yōu)化不足導致的效率低、吞吐量低等缺點。更加令人堪憂的是,分布式計算具有計算結點規(guī)模大、計算任務復雜等特點,計算引擎往往要同時運行復雜繁多的分布式多作業(yè),也就是所謂的分布式多作業(yè)。分布式多作業(yè)相比單作業(yè)在作業(yè)執(zhí)行過程中將更加不利于計算資源的充分利用,這對于分布式大數(shù)據(jù)任務的執(zhí)行將更加雪上加霜。目前,仍然沒有一個完美的資源分配與管理機制滿足分布式多作業(yè)的需求,因此資源的合理分配與回收仍然是提升大數(shù)據(jù)處理系統(tǒng)計算性能的關鍵。

        現(xiàn)在最常用的大數(shù)據(jù)計算系統(tǒng)(如Flink、Spark)都是以多層執(zhí)行圖(Graph)的方式表示作業(yè)的具體信息與執(zhí)行過程。多層執(zhí)行圖是計算系統(tǒng)在作業(yè)提交與作業(yè)執(zhí)行之間生成的一系列有向無環(huán)圖DAG(Directed Acyclic Graph),也是計算引擎中最核心的數(shù)據(jù)結構,它們決定了分布式作業(yè)在每個節(jié)點上的資源部署。也就是說,分布式任務的執(zhí)行都是根據(jù)執(zhí)行圖中的信息在每個節(jié)點上進行任務部署。因此,如何在多作業(yè)執(zhí)行過程中使DAG的合并達到最優(yōu),以及如何優(yōu)化作業(yè)的提交順序與調(diào)度策略,將是高效執(zhí)行多作業(yè)的重要保障。

        本文通過對主流的大數(shù)據(jù)處理系統(tǒng)的研究和探索,結合目前流行的大數(shù)據(jù)處理系統(tǒng)優(yōu)化技術,提出并實現(xiàn)了在作業(yè)層面上的多作業(yè)合并算法與調(diào)度策略。本文的主要貢獻點在于:

        (1)提出了一種啟發(fā)式作業(yè)合并算法。通過采集到的作業(yè)特征,以作業(yè)并行度為基礎分析DAG結構上的差異性,合并浪費資源的作業(yè),釋放占用資源較少的作業(yè)資源,提高集群資源的利用率。

        (2)提出了一種基于負載均衡的多作業(yè)調(diào)度算法。根據(jù)基于作業(yè)特征的多路K-means聚類算法的分析結果使用基于負載均衡的多作業(yè)自平衡輪詢調(diào)度算法提交作業(yè),進一步達到系統(tǒng)負載均衡。

        (3)使用目前最新一代大數(shù)據(jù)計算系統(tǒng)Flink對本文提出的作業(yè)合并算法與多作業(yè)調(diào)度算法的有效性進行了驗證。結果表明,2種作業(yè)合并算法都可以減少作業(yè)的運行時間,提高系統(tǒng)吞吐量;基于負載均衡的多作業(yè)調(diào)度算法在最好情況下可減少40%的線程啟動數(shù)。

        2 相關工作

        2.1 DAG計算模型

        DAG是分布式計算領域中很常見的一種數(shù)據(jù)結構,通常由一系列用戶自定義的算子組成,在各種大數(shù)據(jù)處理系統(tǒng)中都能發(fā)現(xiàn)它的身影,比如Storm、Spark和Flink等。DAG計算將計算任務分解成為若干個子任務[7],并將這些子任務之間的邏輯關系或順序構建成DAG結構。大數(shù)據(jù)計算引擎中的DAG計算通常可以抽象為3層結構:應用表達層、執(zhí)行引擎層和物理執(zhí)行層。應用表達層位于最上層,定義相關算子和轉(zhuǎn)換,將計算任務分解成由若干子任務形成的DAG結構,其優(yōu)點是表達的便捷性,便于開發(fā)者快速描述或構建大數(shù)據(jù)應用。執(zhí)行引擎層介于應用表達層和物理執(zhí)行層之間,將應用表達層構建的DAG計劃任務通過轉(zhuǎn)換和映射,部署到下層的物理機集群中運行,任務的調(diào)度[8]、底層的容錯恢復機制、數(shù)據(jù)與集群信息的傳遞等都要依賴執(zhí)行引擎層。下層是物理執(zhí)行層,即物理集群。

        2.2 Flink中的DAG

        Flink是Apache 開發(fā)的一個同時用于處理批數(shù)據(jù)和流數(shù)據(jù)的有狀態(tài)的計算框架和分布式處理引擎。Flink使用4層DAG結構來描述和表達作業(yè)的執(zhí)行流程,每一層都對作業(yè)執(zhí)行流程做了不同程度的封裝、優(yōu)化和相關屬性的配置。DAG結構是Flink作業(yè)執(zhí)行和部署的核心,主要包含數(shù)據(jù)流圖(StreamGraph)、作業(yè)圖(JobGraph)、執(zhí)行圖(ExecutionGraph)和物理執(zhí)行圖,F(xiàn)link正是通過這4層圖結構把整個作業(yè)的優(yōu)化、資源分配和算子部署進行分離。Flink的4層DAG結構如圖1所示。

        Figure 1 Four-layer DAG structure of Flink

        圖1中,數(shù)據(jù)流圖是用戶通過API接口編寫的、用來表達用戶所要執(zhí)行的計劃任務的邏輯結構。作業(yè)圖是在數(shù)據(jù)流圖的基礎上進行優(yōu)化以及調(diào)整各種參數(shù)配置后的數(shù)據(jù)結構,它裹挾著作業(yè)運行期間所需的必要信息。這些信息被客戶端提交到集群中的協(xié)調(diào)中心,即作業(yè)管理器(JobManager)。執(zhí)行圖可以被視作并行化的作業(yè)圖,當接收到一個新的作業(yè)圖時,會把其中的每一個算子按照其并行度轉(zhuǎn)化成多個可實際部署的子任務(在執(zhí)行圖中以Execution表示)。當執(zhí)行圖中的一系列子任務真正在從結點機器上運行的時候,才會構成物理執(zhí)行圖。

        2.3 多作業(yè)執(zhí)行與調(diào)度研究現(xiàn)狀

        目前最流行的大數(shù)據(jù)處理平臺默認情況下都以FIFO的形式調(diào)度作業(yè)。Wang等[9]為了解決在虛擬化云環(huán)境中同時運行的多個作業(yè)之間的干擾問題,開發(fā)了數(shù)據(jù)驅(qū)動分析模型,估計多個作業(yè)之間的干擾對作業(yè)執(zhí)行時間的影響,并為此提出了一種干擾感知作業(yè)調(diào)度算法。黃廷輝等[10]通過對分布式系統(tǒng)關鍵技術的分析,得出I/O和CPU的不匹配是影響計算性能的一個重要因素,提出合并文件的運行方式,可以減少緩存文件的數(shù)量,提高I/O效率,不過仍存在內(nèi)存成本高的弊端。

        Flink系統(tǒng)中資源是按處理槽(Slot)進行劃分的,支持多種已有的成熟的資源管理器,例如Yarn和Mesos等。Storm作為曾經(jīng)最流行的流式大數(shù)據(jù)處理系統(tǒng),默認是采用輪詢的調(diào)度方式管理作業(yè)的[11]。Qian等[12]為了解決Storm集群中擴展更多新計算機時帶來的負載不均衡問題,設計了S-Storm,為負載均衡群集中均勻分配Slot??傊?,目前的分布式作業(yè)調(diào)度算法和資源分配算法都是基于作業(yè)對資源的需求或者集群中結點資源的使用情況,進行作業(yè)的調(diào)度和資源的分配的,它們面向的是單個作業(yè),并沒有考慮作業(yè)間的關系對集群性能的影響。

        3 基本概念

        一個復雜的DAG通常由多種類型的算子組成,有些算子只涉及本地運算,它們以內(nèi)存共享的方式傳輸數(shù)據(jù),運行效率高,給系統(tǒng)增加的負載小。也有些算子會通過網(wǎng)絡協(xié)議棧傳輸數(shù)據(jù),除了網(wǎng)絡本身的不可靠性會增加延遲,還有網(wǎng)絡緩沖數(shù)據(jù)、序列化、反序列化和用戶態(tài)/內(nèi)核態(tài)之間的切換等耗時操作持續(xù)地占用系統(tǒng)資源。為了便于描述,本文定義了全局算子和本地算子這2個概念。

        定義1(全局算子) 全局算子指在分布式集群中,需要從其他結點獲取數(shù)據(jù)進行處理的算子,如Join和Reduce等。

        定義2(本地算子) 本地算子指在分布式集群環(huán)境中,不需要從其他結點獲取數(shù)據(jù),只對本地數(shù)據(jù)進行處理的算子,如Filter、Map和FlatMap等。

        本文在研究作業(yè)合并和作業(yè)調(diào)度時需要提取DAG的相關特征量,計算作業(yè)之間的差異性并通過聚類算法對作業(yè)進行區(qū)分。聚類算法是一種經(jīng)典的群分析方法[13],它以數(shù)據(jù)間距度量數(shù)據(jù)相似性[14],把相似的數(shù)據(jù)集中到一起,是一種發(fā)現(xiàn)數(shù)據(jù)集內(nèi)部結構特征的無監(jiān)督學習算法[15]。聚類算法按聚類思想可以分為:劃分法聚類、密度法聚類[16]、圖論聚類法[17]和網(wǎng)格法聚類等。

        本文采用的K-means算法是一種典型的劃分聚類法,其思想是預先指定聚類數(shù)目和聚類中心,計算點與點之間的距離,把每一個點歸類到與其距離最近的聚類中心。距離的度量方式很多,本文使用歐氏距離(式(1))和曼哈頓距離(式(2))相結合的方式度量作業(yè)之間的距離,其中n為樣本點維度。

        (1)

        (2)

        歐氏距離從幾何空間的角度衡量元素間的距離,常用于測量度量標準一樣的數(shù)據(jù)間的距離;曼哈頓距離用來計算數(shù)據(jù)在多維屬性上的差之和,可以減弱離群數(shù)據(jù)帶來的影響。

        4 基于啟發(fā)的作業(yè)合并算法

        本節(jié)詳細介紹基于啟發(fā)的作業(yè)合并算法。首先對作業(yè)進行分析,解析作業(yè)的DAG圖,以及作業(yè)任務量與作業(yè)分配到的內(nèi)存資源之間的關系;然后分別采用基于并行度的作業(yè)合并算法和基于DAG結構差異性的作業(yè)合并算法,對占用系統(tǒng)內(nèi)存資源較多的作業(yè)進行合并,從而提高系統(tǒng)的吞吐量。

        4.1 作業(yè)相關特征的提取

        本文采用廣度優(yōu)先遍歷的方式提取作業(yè)執(zhí)行圖中相關的信息,一個典型的作業(yè)執(zhí)行圖如圖2所示,主要包含以下信息:數(shù)據(jù)源文件路徑、作業(yè)并行度和算子總數(shù)等。

        Figure 2 Job execution graph

        處理的數(shù)據(jù)量和作業(yè)分配到的內(nèi)存資源需要通過計算獲得。算法根據(jù)文件路徑信息訪問文件大小,從系統(tǒng)配置文件中讀取為Slot分配的內(nèi)存大小。作業(yè)的分類貫穿于信息采集過程,算法根據(jù)數(shù)據(jù)來源、文件大小、作業(yè)分配到的內(nèi)存資源大小和作業(yè)的執(zhí)行邏輯將作業(yè)分為可合并型作業(yè)與不可合并型作業(yè)。在作業(yè)執(zhí)行流的遍歷過程中,算法以矩陣結構存儲頂點間的連接信息,元素值的大小表示算子間的連接數(shù)。表1是對圖2的信息提取。

        Table 1 Statistics of the number of connections between operators

        4.2 基于并行度的作業(yè)合并算法

        并行度決定了作業(yè)在執(zhí)行時所占集群內(nèi)存資源的總量,且和集群中的Slot是對應的,意味著并行度相同的作業(yè)將分配到相同大小的內(nèi)存資源。因此,對于沒有充分占用內(nèi)存資源的作業(yè),合并并行度相同的作業(yè),可使2個作業(yè)共用1個作業(yè)的內(nèi)存資源,同時不會對作業(yè)執(zhí)行邏輯造成影響。

        影響作業(yè)執(zhí)行的因素有很多,定義3~定義5的3個度量:任務量大小比值(F)、DAG最大深度比值(D)和DAG全局算子數(shù)比值(G),決定作業(yè)的特征。

        定義3(任務量大小比值(F)) 任務量大小比值是表示2個作業(yè)處理任務量大小差異性的重要指標之一,其計算如式(3)所示:

        (3)

        其中,x和y分別表示2個作業(yè)所處理的數(shù)據(jù)集數(shù)量,wf_mi、wf_mj分別表示2個不同作業(yè)處理的文件集合中單個文件的大小。通過實驗得知,F(xiàn)的閾值取值為[0.5,2]。

        定義4(DAG最大深度比值(D)) 表示2個作業(yè)的執(zhí)行圖中最長算子鏈長度的比值,它是反映2個作業(yè)DAG差異性最明顯的指標,其計算如式(4)所示:

        (4)

        其中,dept_m和dept_n分別表示2個作業(yè)執(zhí)行圖的最大深度。DAG深度越大的作業(yè)執(zhí)行時間越長,因此合并后的作業(yè)在數(shù)據(jù)量相當?shù)那闆r下,其執(zhí)行時間取決于合并前DAG深度較大的作業(yè)。D的閾值取值為[0.5,2]。

        定義5(DAG全局算子數(shù)比值(G)) 表示2個作業(yè)圖在全局算子數(shù)量上的差異。全局算子和數(shù)據(jù)傳輸緊密相關,是影響作業(yè)執(zhí)行速度的重要指標之一,體現(xiàn)2個作業(yè)在傳輸上的差異。其計算如式(5)所示:

        (5)

        其中,G表示2個并行度相同的作業(yè)的全局算子數(shù)的比值,gol_m和gol_n分別表示2個作業(yè)中全局算子的個數(shù)。DAG中全局算子的個數(shù)越多,執(zhí)行時間越長。通過實驗得知,G的閾值取值為[0.5,2]。

        基于并行度的作業(yè)合并算法執(zhí)行過程如算法1所示。

        算法1基于并行度的作業(yè)合并算法

        輸入:待合并作業(yè)j;不包含j的待合并作業(yè)集合Jobs。

        輸出:合并后的作業(yè)mergeJob。

        1.forjobinJobsdo

        2.ifjob.parallelism==j.parallelism

        3.計算j與job任務量比值F;

        4.ifF∈[0.5,2]do

        5. 計算j與job的DAG圖最大深度比值D;

        6.ifD∈[0.5,2]do

        7. 計算j與job的全局算子的比值G;

        8.endif

        9.ifG∈[0.5,2]do

        10.mergeJob=merge(j,job);

        11.removejobfromJobs,returnmergeJob;

        12.endif

        13.endif

        14.endif

        15.endfor

        (1)首先從待合并作業(yè)緩沖池的作業(yè)集中取出一個作業(yè)j,然后遍歷Jobs,從中取出一個與j并行度相同的作業(yè)job。

        (2)使用3個度量值衡量作業(yè)job與j的匹配程度,如果job與j在上述3個比值上都能落到對應的閾值空間,兩者匹配,調(diào)用merge函數(shù)合并job與j,返回合并后的結果,終止循環(huán);否則繼續(xù)循環(huán)。

        (3)循環(huán)結束后,檢查mergeJob的值是否為空,如果mergeJob的值為空,說明Jobs中沒有與j并行度相同并且符合3個條件的job,那么j會轉(zhuǎn)而參與基于DAG圖結構差異性的作業(yè)合并計算。

        4.3 基于DAG結構差異性的作業(yè)合并算法

        對于作業(yè)緩沖池中剩余的由于F、D、G取值落在閾值空間以外而無法合并的作業(yè),采用基于DAG結構差異性的作業(yè)合并算法處理。

        算法以DAG結構差異性為切入點,Slot只隔離內(nèi)存資源,因此為了避免作業(yè)對CPU資源的爭搶,盡量選擇異構程度高的作業(yè)進行合并。算法增加2個度量為基于DAG結構差異性的作業(yè)合并算法提供支持。

        定義6(作業(yè)并行度比值(P)) 作業(yè)并行度是作業(yè)最明顯的特征之一,并行度比值是衡量2個作業(yè)在并行度上的差異最明顯的指標。其計算如式(6)所示:

        (6)

        其中,P表示2個作業(yè)并行度的比值,parallelism_m和parallelism_n表示2個作業(yè)的并行度。并行度是對應于集群中的Slot數(shù)量,因此基于DAG的作業(yè)合并算法在合并作業(yè)時首先需要考慮的就是作業(yè)并行度。通過實驗得知,P的閾值取值為[0.5,2]。

        定義7(DAG結構相似性(S)) DAG結構相似性反映2個作業(yè)在執(zhí)行邏輯上的差異,以歐氏距離為基礎定義了DAG結構相似性,其計算如式(7)所示:

        (7)

        其中,o表示算子的數(shù)量。

        在特征提取過程中使用矩陣保存作業(yè)執(zhí)行流程圖的基本信息,M和N分別表示存儲作業(yè)執(zhí)行流程圖基本信息的矩陣,Mij和Nij分別表示矩陣中的元素。算法執(zhí)行過程如算法2所示。

        算法2基于DAG結構差異性的作業(yè)合并算法

        輸入:待合并作業(yè)j;不包含j的待合并作業(yè)集合Jobs。

        輸出:合并后的作業(yè)mergeJob。

        1.按并行度大小給Jobs中的作業(yè)從小到大排序

        2.中間作業(yè)集合為midJobs;

        3.forjobinJobsdo

        4. 計算j與job任務量比值F;

        5.ifF∈[0.5,2]do

        6. 計算j與job的DAG圖最大深度比值D

        7.ifD∈[0.5,2]do

        8. 計算j與job的全局算子的比值G

        9.ifG∈[0.5,2]do

        10. 計算j與job并行度比值P

        11.ifP∈[0.5,2]do

        12. addjobtomidJobs

        13.endif

        14.endif

        15.endif

        16.endif

        17.endfor

        18.forjobinmidJobs

        19.計算j與jobDAG圖矩陣間的歐氏距離U;

        20.更新U獲取最小值,并記錄相應的job;

        21.endfor

        22.mergeJob=merge(j,job)

        23.returnmergeJob

        (1)從待合并作業(yè)中取出一個作業(yè)j,然后遍歷Jobs,獲取一個與j并行度相同的作業(yè)job;

        (2)在循環(huán)中使用4個度量值衡量作業(yè)job與作業(yè)j的匹配程度,如果符合對應的閾值空間,則把作業(yè)job加入到中間作業(yè)集midJobs中;

        (3)遍歷中間作業(yè)集合midJobs,使用歐氏距離從中間數(shù)據(jù)集合中選出與作業(yè)j在歐氏距離上相似度最小的作業(yè)job,合并作業(yè)j與job并返回結果。

        5 基于負載均衡的多作業(yè)調(diào)度算法

        除了作業(yè)合并之外,作業(yè)的執(zhí)行順序與調(diào)度策略也是影響多作業(yè)執(zhí)行效率的重要因素。因此,本文提出基于負載均衡的多作業(yè)調(diào)度算法,其由3部分組成:

        (1)預處理模塊:進行相關特征的提取工作,包括作業(yè)并行度、算子個數(shù)和算子類型等;(2)分類模塊:采用K-means聚類算法根據(jù)提取的特征信息對作業(yè)進行聚類分析,聚類算法在負載均衡方面應用廣泛[18,19],經(jīng)過聚類把作業(yè)分成3個類別:大作業(yè)、中等作業(yè)和小作業(yè);(3)調(diào)度模塊:調(diào)度模塊根據(jù)聚類結果,使用自平衡輪詢調(diào)度算法計算作業(yè)的提交順序,同時充分利用集群的Slot資源,防止Slot閑置。

        5.1 作業(yè)相關特征的提取

        基于負載均衡的多作業(yè)調(diào)度算法主要使用作業(yè)并行度、算子總數(shù)、各類型算子個數(shù)和作業(yè)圖深度為參考,通過遍歷對信息進行采集。該算法執(zhí)行過程如算法3所示。

        算法3DAG特征提取算法

        輸入:作業(yè)DAG結構圖Plan。

        輸出:提取到的信息集合infoList。

        1.fornodeinPlando

        2.max=Math.max(max,BFS(node));

        3.ifnodeis not visited

        4. add node’s characters toinfoList,node.visited=true;

        5.node相連接的未被訪問的頂點入隊列Q;

        6.whileQis not emptydo

        7.v=Q頭元素出隊列;

        8.addv’s characters toinfoList,v.visited= true;

        9.v相連接的未被訪問的頂點入隊列Q;

        10.endwhile

        11.endif

        12.infoList.max=max

        13.endfor

        14.returninfoList

        (1)使用深度優(yōu)先搜索DFS(Depth First Search)計算從Sink算子到距離最遠的Source算子的距離,并記錄在max中;如果node頂點未被訪問過,將頂點信息存入infoList中。

        (2)將與node頂點相連的頂點加入隊列Q,如果Q不為空,從Q中取出一個頂點v,將v的信息記錄到infoList中,與v相連的未訪問過的頂點加入隊列。

        (3)更新infoList中的DAG深度,for循環(huán)直到遍歷完Plan中的所有頂點,返回infoList。

        5.2 基于作業(yè)特征的多路聚類分析

        聚類分析模塊將根據(jù)特征信息對作業(yè)進行分類,使用4種數(shù)據(jù)度量作業(yè)之間的相似性,分別是作業(yè)并行度、各類算子個數(shù)、作業(yè)執(zhí)行流程圖深度和全局算子的個數(shù)。算法采用歐氏距離與曼哈頓距離相結合的方式測量作業(yè)間的距離。ope[i]是以數(shù)組的形式存儲,dept、全局算子ops的大小是衡量作業(yè)流程復雜性的度量標準。

        定義8(作業(yè)在不同算子類型上的差異性) 算子及算子類型最能區(qū)分作業(yè)的不同,算子類型的差異性反映了作業(yè)的總體差異性,其計算如式(8)所示:

        (8)

        其中,mope[i]與nope[i]分別為作業(yè)m與作業(yè)n的不同類型的算子的個數(shù)。

        定義9(作業(yè)在DAG結構深度上的差異性) DAG結構深度是作業(yè)最明顯的特征之一,它描述了作業(yè)運行時數(shù)據(jù)流通的最大路徑,其計算如式(9)所示:

        distancedept(m,n)=|mdept-ndept|

        (9)

        其中,mdept與ndept分別為作業(yè)m與作業(yè)n的DAG結構深度。

        定義10(作業(yè)在Task線程數(shù)上的差異性) 作業(yè)在集群中開啟的線程數(shù)直接反映作業(yè)對系統(tǒng)CPU資源的占用量,作業(yè)在Task線程數(shù)上的差異性計算如式(10)所示:

        distancetasknum(m,n)=|mpara*mops-npara*nops|

        (10)

        其中,mpara與npara分別為作業(yè)m與作業(yè)n的并行度,mops與nops分別為作業(yè)m與作業(yè)n的全局算子數(shù)量。

        定義11(作業(yè)的差異性) 本文從3個角度衡量了作業(yè)之間的差異性,其計算如式(11)所示:

        distance(m,n)=distanceope(m,n)+distancedept(m,n)+distancetasknum(m,n)

        (11)

        本文提出的基于作業(yè)特征的多路K-means聚類分析算法如算法4所示。

        算法4基于作業(yè)特征的多路K-means聚類分析算法

        輸入:作業(yè)及其特征集合PlanList。

        輸出:聚類結果ClusterResult。

        1. 根據(jù)并行度乘以算子總數(shù)的大小對PlanList進行排序;

        2. 獲取初始聚類中心點集合;

        3.fori=1 to 3do

        4.center_i=K_means(PlanList,center_i);

        5.endfor

        6.fori=1 to 3do

        7. 計算每個聚類中心點將PlanList劃分的程度;

        8.endfor

        9.center=K_means(PlanList,center);

        10.根據(jù)center將PlanList分組并放入ClusterResult中

        11.returnClusterResult

        (1)對作業(yè)及其特征集合PlanList按并行度乘以算子總數(shù)大小進行排序。

        (2)從排好序的PlanList中選擇3個作業(yè)作為聚類中心;以排好序的PlanList的隊列頭作業(yè)、隊列尾作業(yè)和中間作業(yè)作為聚類中心;從排好序的PlanLsit中分別取隊列頭3個作業(yè)、隊列中間3個作業(yè)、隊列尾部3個作業(yè),取其平均值作為聚類中心。

        (3)調(diào)用K_means算法循環(huán)更新每個聚類中心的值;計算每個聚類中心將PlanList劃分的程度,劃分程度度量標準為,聚類結果每類作業(yè)的數(shù)量越平均越好。選取聚類結果好的2個聚類中心取其平均值,調(diào)用K_means算法進行最后聚類;計算聚類結果,并輸出結果。

        5.3 基于負載均衡的多作業(yè)自平衡輪詢調(diào)度

        通過多路聚類的方式優(yōu)化了聚類中心點的選取,通過基于作業(yè)特征的多路K-means聚類分析可以把作業(yè)集合根據(jù)聚類中心點聚集成3個作業(yè)類別,為算法提供可靠的支持。

        本文以輪詢調(diào)度法[20 - 23]為基礎實現(xiàn)了多作業(yè)的提交優(yōu)化,目的是在不浪費集群Slot資源的情況下,使集群開啟的Task線程數(shù)量保持平穩(wěn),以此達到在多作業(yè)情況下平衡集群性能的目的。集群中作業(yè)工作的線程數(shù)量是由作業(yè)并行度和算子個數(shù)決定的,因此控制作業(yè)的提交順序,可以達到控制集群開啟的Task線程數(shù)量的目的。作業(yè)能否提交成功取決于集群剩余并行度是否滿足作業(yè)的并行度需求,如果作業(yè)的并行度比集群中可用并行度大,作業(yè)就會被拒絕,因此輪詢的作業(yè)提交方式并不會嚴格執(zhí)行,而且集群空閑的Slot資源會隨著作業(yè)的提交和結束動態(tài)地變化。針對這種情況本文設計了自平衡的輪詢調(diào)度算法,如算法5所示。

        算法5基于負載均衡的多作業(yè)自平衡輪詢調(diào)度算法

        輸入:聚類結果ClusterResult;最后的聚類中心center。

        輸出:下一個提交的作業(yè)Job。

        1. 對K-means聚類結果收集排序;

        2. 平分排好序的作業(yè)到3個隊列中,并設置指針p;

        3.翻轉(zhuǎn)隊列midQueue、minQueue,查詢集群剩余Slot;

        4.ifslotNum> 0do

        5.ifjobNum> 0do

        6.pre=p;queue=Queue[p];

        7.whilequeueis not emptydo

        8.max= 0;

        9.foreleminqueuedo

        10.ifelem.parallelism≤slotNumdo

        11.ifmax

        12.job=elem;max=elem.parallelism;

        13.endif

        14.endif

        15.endfor

        16.endwhile

        17.ifmax!= 0do

        18.p=(p++)%3;

        19.endif

        20.ifmax== 0do

        21.p=(p++)%3;

        22.ifp==predo返回 4;

        23.endif

        24. 返回 7;

        25.endif

        26.endif

        27.endif

        (1)對K-means聚類產(chǎn)生的3個集合中的元素按元素距離聚類中心點的距離大小進行排序;比較3個聚類中心點的大小,按聚類中心點的大小,從大到小合并3個排好序的作業(yè)集合。

        (2)將合并后的集合平均分成3份,并放入3個隊列中,將midQueue和minQueue隊列進行逆轉(zhuǎn)。

        (3)每隔5 s查詢一次集群剩余Slot資源,從指針指向的隊列開始,遍歷隊列中的元素找到集群中空閑Slot資源能滿足的最大并行度的作業(yè)提交。每次提交作業(yè)后,修改指針指向下一隊列。

        (4)對3個集合進行判斷,如果出現(xiàn)隊列為空,并且總作業(yè)的數(shù)量大于2,按順序收集3個集合中的隊列,再平分所有的作業(yè)到3個集合中,并更改指針使其指向midQueue,否則不再進行作業(yè)收集。

        6 實驗

        本文使用2種類型的作業(yè)來進行對比實驗,一種是單詞統(tǒng)計(WordCount),另一種是表連接(Join)。因為全局算子中最復雜的算子就是Join類型算子,其他簡單類型的算子使用最多的是Filter、Map和FlatMap,因此WordCount作業(yè)和Join作業(yè)足以覆蓋實際應用中的大部分場景。

        6.1 數(shù)據(jù)集與評估指標

        本文實驗采用大數(shù)據(jù)測試基準TPC-H生成的數(shù)據(jù)集,是事務性能管理委員會TPC(Transaction Processing Performance Council)發(fā)布的權威數(shù)據(jù)庫評測基準,可以保證生成的模擬數(shù)據(jù)具有真實性、客觀性與健壯性。在WordCount實驗中本文選用5個基本的大數(shù)據(jù)集來模擬批處理環(huán)境中的大規(guī)模數(shù)據(jù)處理。在表連接實驗中,本文選取TPC-H生成的Lineitem表和Orders表作為數(shù)據(jù)源,其中Lineitem有16個字段,前3個字段Orderkey、Partkey和Suppkey是主鍵。Orders表有9個字段,前2個字段Orderkey和Custkey是主鍵。

        實驗的評估指標有3個:

        (1)作業(yè)運行時間:在相同硬件條件下,任務量相同、處理邏輯相同的作業(yè)處理速度越快,表明系統(tǒng)性能越好。

        (2)作業(yè)吞吐量:單位時間內(nèi)集群處理的平均數(shù)據(jù)量大小,即被處理的總數(shù)量(totalDataVolume)與運行總時間(totalProcessTime)的比值,其定義如式(12)所示:

        (12)

        (3)集群開啟的最高Task線程數(shù):本文提出的基于負載均衡的多作業(yè)調(diào)度算法以降低集群同一時刻開啟的最高Task線程數(shù)為首要目標。

        6.2 實驗環(huán)境設置

        本文所描述的相關技術細節(jié)均在Flink 1.8.0版本中進行實現(xiàn),實驗運行的軟硬件環(huán)境如下所示:

        (1)硬件環(huán)境:采用的分布式環(huán)境由4臺服務器組成,1臺主結點,3臺從結點,結點之間通過千兆以太網(wǎng)連接。配置為:CPU:Intel Xeon E5-2603 V4 *2,核心數(shù)目:6核心;內(nèi)存:128 GB(從結點64 GB);硬盤:400 GB SSD。

        (2)軟件環(huán)境:操作系統(tǒng):CentOs 7;Flink版本:1.8.0,JDK版本:1.8.0;存儲環(huán)境:Hadoop 2.7.5。

        6.3 實驗結果與分析

        (1)基于并行度的作業(yè)合并算法實驗。

        作業(yè)合并算法實驗對一對相同的WordCount作業(yè)和一對不同的Join作業(yè)分別進行順序執(zhí)行和合并執(zhí)行。表2展示了作業(yè)的基本信息。

        Table 2 Job sets information for experiment 1

        圖3對比了2個WordCount作業(yè)在相同實驗環(huán)境、相同數(shù)據(jù)集上順序執(zhí)行和合并執(zhí)行的執(zhí)行結果。其中圖3a對比了執(zhí)行時間,合并執(zhí)行的執(zhí)行時間減少了5%~23%。在內(nèi)存資源足夠使用的前提下,單個WordCount程序?qū)篊PU的利用沒有達到時刻滿負荷運行的狀態(tài),所以作業(yè)合并不僅能提高集群的內(nèi)存資源利用,也能提升集群CPU資源的利用。圖3b對比了吞吐量,采用了作業(yè)合并算法后系統(tǒng)可以更快地到達吞吐量峰值。

        Figure 3 Results of WordCount job merging based on the number of parallelism

        圖4對比了Join1和Join2作業(yè)在相同實驗環(huán)境、相同數(shù)據(jù)集上順序執(zhí)行和合并執(zhí)行的執(zhí)行結果。其中圖4a對比了運行時間,圖4b對比了系統(tǒng)吞吐量。盡管效果不如WordCount作業(yè)明顯,但基于并行度的作業(yè)合并算法對運行時間仍有一定縮減,吞吐量提升效果在4.5%~20%。

        Figure 4 Results of Join job merging based on the number of parallelism

        (2)基于DAG結構差異的作業(yè)合并算法實驗。

        實驗先后提交了2個并行度不同的WordCount作業(yè)和Join作業(yè),來模擬基于DAG結構差異性的作業(yè)合并。

        圖5和圖6從運行時間和吞吐量2個方面展示了作業(yè)合并算法的提升效果。合并執(zhí)行的執(zhí)行時間明顯低于順序執(zhí)行的總時間,并且差距明顯,因為本實驗不是在滿并行度的條件下進行的,實際執(zhí)行時可能會出現(xiàn)不同情況,對于WordCount作業(yè),基于DAG結構差異性的作業(yè)合并算法具有明顯的優(yōu)勢。

        Figure 5 Results of WordCount job merging based on DAG structure difference

        Figure 6 Results of Join job merging based on DAG structure difference

        (3)基于負載均衡的多作業(yè)調(diào)度算法實驗

        對于多作業(yè)調(diào)度算法,實驗以4個作業(yè)為基礎,表3列出了作業(yè)算子的基本信息,這些作業(yè)特征信息是衡量作業(yè)之間差異性的關鍵。實驗模擬了7個任務量大小不同的作業(yè),采用隨機的方式模擬了作業(yè)的提交順序,將其執(zhí)行結果與多作業(yè)調(diào)度算法的結果進行比較。表4展示了作業(yè)對應的編號以及其處理任務量信息,表5展示了優(yōu)化前后作業(yè)的提交順序。

        Table 3 Job sets information for experiment 3

        Table 4 Job number and corresponding processing task volume

        Table 5 Order of job submission

        圖7展示了基于負載均衡的多作業(yè)調(diào)度算法的提升效果。從圖7a可以看出,通過調(diào)優(yōu)作業(yè)的提交順序可縮短作業(yè)處理的時間,但存在某些按FIFO提交模式的順序比優(yōu)化后的輪詢提交順序要好,該情況的出現(xiàn)是因為算法在執(zhí)行過程中并未考慮到任務量的大小。從圖7b可以看出,基于負載均衡的多作業(yè)調(diào)度算法在吞吐量性能上提升了5%左右。圖7c 展示了集群開啟的Task線程數(shù)對比,基于負載均衡的多作業(yè)調(diào)度算法執(zhí)行作業(yè)集時,集群開啟的最大線程數(shù)在多數(shù)情況下有所減少,最好情況下減少了40%的線程。

        Figure 7 Running results of multi-job scheduling algorithm based on load balancing

        7 結束語

        本文通過分析作業(yè)與系統(tǒng)資源之間的關系,以及作業(yè)與作業(yè)之間的關系,提出并實現(xiàn)了提高集群資源利用率和負載均衡能力的算法,本文提出的優(yōu)化主要包含2個方面:

        (1)提出了啟發(fā)式的作業(yè)合并算法,通過分析作業(yè)任務量和作業(yè)分配到的集群資源之間的關系,合并對集群資源利用率低的作業(yè),使它們共用同一個作業(yè)的資源。該算法解決了集群部分作業(yè)資源利用率低的問題,并通過實驗驗證了該算法在不同類型作業(yè)上對集群性能提升的有效性。

        (2)提出了基于負載均衡的多作業(yè)調(diào)度算法,首先對作業(yè)進行特征提取;然后通過多路K-means聚類算法將作業(yè)分為3類:大作業(yè)、中等作業(yè)和小作業(yè);之后采用自平衡輪詢調(diào)度算法提交分類好的作業(yè),保證大作業(yè)不會在集群中集中執(zhí)行,降低了集群由于開啟過多線程造成集群性能下降的概率,并通過實驗驗證了該算法的有效性。

        分布式系統(tǒng)在多作業(yè)執(zhí)行層面還有許多需要優(yōu)化和提高的部分,未來可繼續(xù)研究的問題有:

        (1)動態(tài)調(diào)度。目前的分布式大數(shù)據(jù)處理系統(tǒng)未能做到在作業(yè)執(zhí)行過程中動態(tài)地調(diào)整作業(yè)的執(zhí)行流程,這種方式不利于資源的動態(tài)回收與共享。針對這一問題,系統(tǒng)需要做出相應的優(yōu)化和改進。

        (2)優(yōu)化多作業(yè)并行度。并行度是作業(yè)執(zhí)行的關鍵,目前在分布式大數(shù)據(jù)處理平臺的應用中,一般都是從業(yè)人員根據(jù)數(shù)據(jù)與業(yè)務特性手動優(yōu)化并行度,這樣就給并行度的優(yōu)化帶來了很多困難。因此,研究和設計出一套并行度設置的優(yōu)化方案,也是分布式大數(shù)據(jù)系統(tǒng)應用方面的一個研究課題。

        猜你喜歡
        算子集群分布式
        擬微分算子在Hp(ω)上的有界性
        各向異性次Laplace算子和擬p-次Laplace算子的Picone恒等式及其應用
        海上小型無人機集群的反制裝備需求與應對之策研究
        一類Markov模算子半群與相應的算子值Dirichlet型刻畫
        一種無人機集群發(fā)射回收裝置的控制系統(tǒng)設計
        電子制作(2018年11期)2018-08-04 03:25:40
        分布式光伏熱錢洶涌
        能源(2017年10期)2017-12-20 05:54:07
        分布式光伏:爆發(fā)還是徘徊
        能源(2017年5期)2017-07-06 09:25:54
        Python與Spark集群在收費數(shù)據(jù)分析中的應用
        勤快又呆萌的集群機器人
        Roper-Suffridge延拓算子與Loewner鏈
        日本在线一区二区三区视频| 欧美在线视频免费观看| 亚洲欧美日韩中文字幕网址| 日本熟妇视频在线中出| av剧情演绎福利对白| 亚洲一区二区三区播放| 日韩中文字幕中文有码| 亚洲中文字幕有综合久久| 日韩在线精品视频一区| 欧洲美女黑人粗性暴交| 亚洲人成7777影视在线观看| 亚洲国产日韩综一区二区在性色| 久草福利国产精品资源| 性裸交a片一区二区三区| 精品国产网红福利在线观看| 在线看不卡的国产视频| 精品激情成人影院在线播放| 男女啪啪无遮挡免费网站| 婷婷第四色| 亚洲性av少妇中文字幕| 曰韩无码无遮挡a级毛片| 亚洲熟女少妇一区二区| 亚洲国产不卡av一区二区三区| 少妇太爽了在线观看免费| 国产人妻丰满熟妇嗷嗷叫| 日本亚洲欧美在线观看| 精品蜜臀国产av一区二区| 亚洲精品无码不卡在线播he| 久久久久亚洲av无码专区导航| 无码AV午夜福利一区| 第一九区另类中文字幕| 国产精品r级最新在线观看| 男女一级毛片免费视频看| 日本一区二区三区在线观看视频| 亚洲成av人综合在线观看| 精品久久久久久中文字幕大豆网| 国产杨幂AV在线播放| 亚洲av综合av一区| 国产综合无码一区二区色蜜蜜| 男人阁久久| 91九色中文视频在线观看|