闞京+陳彩+梁毅
摘 要:伴隨著數(shù)據(jù)的爆炸式增長,越來越多的大數(shù)據(jù)業(yè)務分析與處理選擇分布式計算平臺。目前針對大數(shù)據(jù)的分布式計算框架都支持DAG式的任務編排。由于大數(shù)據(jù)采集來源以及分布式存儲系統(tǒng)特點,很多使用DAG框架進行計算的應用都是增量式的大數(shù)據(jù)集,但現(xiàn)有的DAG框架對這樣的數(shù)據(jù)集進行計算時有許多冗余,造成計算資源浪費。提出了在DAG框架上進行增量式復用的方法,并針對FILTER算子特點提出了基于FILTER算子匹配的間接復用機制。
關鍵詞:增量計算;分布式計算;計算復用;查詢優(yōu)化;DAG計算
DOIDOI:10.11907/rjdk.171282
中圖分類號:TP301
文獻標識碼:A 文章編號:1672-7800(2017)007-0026-03
0 引言
利用大數(shù)據(jù)進行業(yè)務分析與處理越來越多,Google的Map-Reduce、Apache Hadoop MR以及Apache Spark是應用甚廣的分布式計算框架[1-2],即便缺乏分布式系統(tǒng)經(jīng)驗的大數(shù)據(jù)工作者,也能充分利用分布式計算所帶來的性能提升。
早期這些分布式計算框架主要用于批處理作業(yè),隨著數(shù)據(jù)量的增加以及業(yè)務需求的演進,更復雜高效的數(shù)據(jù)操作手段成為大數(shù)據(jù)分析與處理的重要需求。Pig、Hive以及Spark SQL都提供了相較于MPI及Map-Reduce等傳統(tǒng)分布式計算手段更高層次的分布式數(shù)據(jù)集接口,通過這些接口可以更輕松地對大數(shù)據(jù)集進行分析與處理[3–5]。
大數(shù)據(jù)計算平臺的分布式存儲系統(tǒng),如GFS、HDFS都具備一些共同特征:數(shù)據(jù)的更新主要通過附加新數(shù)據(jù)方式完成,而這類增量式數(shù)據(jù)進行處理時,往往會產(chǎn)生較多的冗余計算[6-7]。
本文提出了在增量計算中進行間接復用的方法,并通過解構(gòu)、分析FILTER算子方式,達成比算子級更低粒度的計算復用,從而提升算子復用機會,使整體計算效率得到提升。
1 相關工作
增量計算中的復用方法有以下兩種方式:
(1)對結(jié)果集進行局部更新,即通過針對特定的分布式作業(yè)管線設計一系列動態(tài)算法,并依據(jù)該算法對此前的結(jié)果集進行小規(guī)模的局部更新,從而使剩余大部分結(jié)果集得以復用。Google設計的可用來更新Page Rank的Percolator就是采用這種方法的代表[8]。該種方式主要針對特定的分布式計算任務。
(2)對作業(yè)管線中的子任務進行緩存及復用,從而透明地重用此前的計算結(jié)果,由系統(tǒng)自動地將可復用的子任務計算結(jié)果進行緩存,在緩存命中的情況下,就可避免相關子任務計算。
ReStore通過分析并改寫作業(yè)的物理計劃達到復用之前計算的目的[9]。ReStore不僅可復用完整的計算作業(yè),還可復用作業(yè)中的子作業(yè),有效提升了復用發(fā)生幾率。
與Restore不同,Nectar系統(tǒng)所對應的輸入是DAG作業(yè),而非傳統(tǒng)的MR作業(yè)[10]。Nectar在DryadInc的兩個DAG作業(yè)復用算法IDE及MER基礎上,闡述了整個復用體系的工作機制,并重點闡述了對數(shù)據(jù)中心的數(shù)據(jù)管理方法[7],在復用粒度上與Restore相似。Nectar在查詢?nèi)蝿盏倪壿嬘媱澤线M行分析與復用優(yōu)化,將計算邏輯與計算數(shù)據(jù)進行復合并作為緩存鍵,將計算結(jié)果存儲于分布式文件系統(tǒng)中,并通過緩存服務器對其進行索引。
2 基于FILTER算子匹配的間接復用
2.1 基本概念
大數(shù)據(jù)規(guī)模龐大,在以事務為中心的數(shù)據(jù)庫中難以進行批量計算和處理,非關系式的數(shù)據(jù)模型索引建立和維護較為困難,在檢索上往往面臨性能上的挑戰(zhàn)。目前分布式計算框架是通過多節(jié)點的并行運算來提升計算效能的。
增量數(shù)據(jù)往往是對事物的客觀記錄和描述,通常都是歷史信息,這類數(shù)據(jù)具有不會更新、刪除的特點。增量型數(shù)據(jù)可定義為:
其中,D(t)表示t時刻數(shù)據(jù)集。當需要多次對這樣的數(shù)據(jù)集進行非連接型操作(例如過濾、投影以及聚合操作等)時,往往要付出巨大代價。為了針對增量式數(shù)據(jù)設計增量式DAG計算復用策略,首先對DAG中的算子進行劃分。在這些算子中,可合并計算算子可以有效地進行增量計算復用??珊喜⒂嬎闼阕佣x為這樣一類函數(shù):
則稱f為可合并計算算子, merge為其合并函數(shù)。在DAG運算中,F(xiàn)ILTER、SORT、PROJECT,AGGREGATE等算子都具備該性質(zhì)。
2.2 間接復用
在對FILTER算子進行復用時,除了在算子完全相同時直接復用計算結(jié)果外,還存在間接復用計算結(jié)果的可能性。下面介紹基于FILTER算子匹配的間接復用方法。
例:在所有人中找年齡大于30的人,若在此前已經(jīng)找過年齡大于20的人,且該結(jié)果已經(jīng)緩存,那么這部分被緩存的數(shù)據(jù)仍然可以利用,即便算子Filter(Person.Age > 30)與Filter(Person.Age > 20)并不匹配,如圖1所示。
虛線表示計算上的邏輯需求:為尋找年齡大于30的人,就必須遍歷整個Person集合,而目前已經(jīng)存在一個篩選好的年齡大于20的集合,那么在計算過程中就無需去遍歷全集,只需從大于20的人的集合進行篩選即可,這就減少了I/O占用及實際計算規(guī)模,這種復用方式稱為間接復用。間接復用充分利用了緩存中的小規(guī)模數(shù)據(jù)集,從而使這部分被緩存的數(shù)據(jù)集可以直接復用。在相同的緩存空間下,使得緩存數(shù)據(jù)利用率得以提高。因此,從DAG的復用分析角度來講,間接復用是一種比算子級復用粒度更小的復用手段。若采用間接復用手段,除了算子要具備可合并計算性質(zhì),還要求被復用的算子及其依賴的計算流程具備更特殊的性質(zhì)——包含。這一包含關系可定義為:
對于兩個函數(shù)G1:A→B 和G2:A→B ,若對于任意的數(shù)據(jù)D∈A,若總是有:
則稱函數(shù)G1包含G2。這一包含關系實際上表示了G2的結(jié)果集總是可以由G1的結(jié)果集導出。
對于具備這種形式的可復用計算算子,可采用如下方式進行增量式復用,首先若給定t1時刻的數(shù)據(jù)集I,其輸出為R1,記為:
當G2試圖間接復用該結(jié)果時,即將R1置入緩存,在t2時刻(t2≥ t1),若數(shù)據(jù)集I的增量為Δ,則新的計算結(jié)果可根據(jù)緩存中的R1和Δ導出:
雖然間接復用過程中對緩存數(shù)據(jù)應用了G2算子,但是若被復用的算子具備壓縮數(shù)據(jù)規(guī)模能力,則對于占大比例的復用數(shù)據(jù)而言,G2所需處理的數(shù)據(jù)規(guī)模就大大縮小了,而這實質(zhì)上就減少了每個計算單元的任務量和I/O消耗。
基于此,為實現(xiàn)基于FILTER算子匹配的間接復用,只需找到一種算法來判斷兩個FILTER算子是否具有包含關系即可。對于給定的兩個謂詞表達式P1(x)和P2(x),若滿足:
即二者具備包含關系。對于這樣的一對FILTER算子,若算子FilterP2是新到達的DAG算子,而FilterP1是緩存了的算子,則可根據(jù)復用算法,以間接方式利用緩存中的數(shù)據(jù)。
將兩個FILTER中的謂詞表達式都轉(zhuǎn)換為等價的CNF(Conjunctive Normal Form),設一個FILTER的謂詞表達式F為CNF,則F可表示為多個簡單析取式的合取,即該FILTER的謂詞表達式可表示為如下形式:
對于已經(jīng)按合取運算符AND拆分的簡單析取式,可表示為:
對于F而言,每個簡單析取式所對應的真值集的交集就是F所對應的真值集,由此可得判定F能否包含另一個謂詞表達式F'的充分條件就是:
即對任意的Fi,都存在至少一個Fj'蘊含F(xiàn)i,那么F′蘊含F(xiàn),本文根據(jù)該條件對FILTER算子的包含關系進行判定,證明如下:
對于給定的F和F',若式(9)為真,則有:
基于該判定條件,欲解決該問題,只需對兩個簡單析取式的蘊含關系進行判定即可。
基于上述描述,判斷兩個簡單析取式Fi和Fj之間是否存在蘊含關系的算法如下:
(1)Fi和Fj的所有基本項中的鍵名存在不同,判定為不一定蘊含,退出。
(2)按照表 1計算Fi∧Fj,為真時保留f',為假時用False表示,若結(jié)果不為Fj,判定為不一定蘊含,退出。
(3)判定為Fj'蘊含F(xiàn)i,退出。
由于算法中的(1)和(2)對計算提出了一定的條件,因此其結(jié)果均為不一定蘊含,這意味著該算法仍然為一個充分性的判定。
通過上文給出的判斷兩個FILTER算子是否相互包含的判定算法,就可以在新的DAG計算到達且不存在直接復用條件時,通過FILTER的包含關系來進行間接復用,從而增加數(shù)據(jù)的復用機會。
3 實驗
3.1 環(huán)境
增量條件下的DAG計算要求原始數(shù)據(jù)集是增量的,為此本文設計了一個增量數(shù)據(jù)集,該數(shù)據(jù)集模擬日志類的數(shù)據(jù),每輪實驗結(jié)束后,為該數(shù)據(jù)增長約1.7 GB的數(shù)據(jù)。實驗使用3個節(jié)點,各節(jié)點配置如表 2所示。
實驗平臺為Spark SQL,緩存使用Spark SQL默認的緩存管理器。
3.2 結(jié)果與分析
實驗在開始階段,采樣密度為每次增量都采集一次實驗數(shù)據(jù),在后半段維持增量與計算不變,僅減少采樣頻率。
DAG中的FILTER在本試驗中以隨機方式生成,對全部的實驗負載,覆蓋100%數(shù)據(jù)集計算,66%的實驗負載,覆蓋10%的數(shù)據(jù)集計算,這樣的負載模擬了實際負載中熱點數(shù)據(jù)被更多關注的特點。實驗對直接復用方法和加入FILTER算子匹配的間接復用方法的增量計算進行對比測評,結(jié)果如圖 2所示。
通過使用基于FILTER算子識別的間接復用,相較于直接復用方法,時間開銷平均降低84.91%,具體復用情況見表3。
由此可見,引入基于FILTER算子識別的間接復用可以大幅度提升緩存匹配的成功率,提升緩存利用率及系統(tǒng)整體運行效率。
4 結(jié)語
本文提出了基于FILTER算子匹配的增量式DAG計算復用方法,給出了通過識別FILTER算子的包含關系來達成對FILTER算子更細粒度的增量計算復用手段,并通過實驗驗證了該復用方法可提升增量計算復用中緩存被命中的機會,進而提高計算平臺整體運行性能。
參考文獻:
[1] DEAN J,GHEMAWAT S.MapReduce:simplified data processing on large clusters[J].In Proceedings of Operating Systems Design and Implementation,2004,51(1):107-113.
[2] ZAHARIA M,CHOWDHURY M,DAS T,et al.Resilient distributed datasets:a fault-tolerant abstraction for in-memory cluster computing[J].In-Memory Cluster Computing.USENIX Symposium on Networked Systems Design and Implementation,2012,70(2):141-146.
[3] ARMBRUST M,XIN R S,LIAN C,et al.Spark SQL:relational data processing in Spark[M].ACM,2015:1383-1394.
[4] OLSTON C,REED B,SRIVASTAVA U,et al.PigLatin:a not-so-foreign language for data processing[J].Science China Information Sciences,2008(1):1099-1110.
[5] THUSOO A,SARMA J S,JAIN N,et al.Hive:a warehousing solution over a map-reduce framework[J].Proceedings of the Vldb Endowment,2009,2(2):1626-1629.
[6] GHEMAWAT S,GOBIOFF H,LEUNG S T.The Google file system[J].ACM Press,2003(5):29-43.
[7] POPA L,BUDIU M,YU Y,et al.DryadInc:reusing work in large-scale computations[EB/OL].https://link.springer.com/article/10.1007%2Fs13222-012-0109-3.
[8] PENG D,DABEK F.Large-scale incremental processing using distributed transactions and notifications[EB/OL].https://www.hanspub.org/reference/ReferencePapers.aspx?PaperID=9501&ReferenceID=23068.
[9] ELGHANDOUR I,ABOULNAGA A.Restore:reusing results of MapReduce jobs[J].Proceedings of the VLDB Endowment,2012,5(6):586-597.
[10] GUNDA P K,RAVINDRANATH L,THEKKATH C A,et al.Nectar:automatic management of data and computation in datacenters[EB/OL].http://www.doc88.com/p-3149057701380.html.