李 旺 雙 鍇
(北京郵電大學(xué)網(wǎng)絡(luò)技術(shù)研究院 北京 100876)
社交媒體、移動(dòng)設(shè)備及傳感器以前所未有的速度持續(xù)產(chǎn)生著海量數(shù)據(jù),這些數(shù)據(jù)經(jīng)過簡(jiǎn)單的預(yù)處理之后被存儲(chǔ)到分布式數(shù)據(jù)倉(cāng)庫中,用于后期的計(jì)算、分析與挖掘。由于數(shù)據(jù)量巨大,需要采用分布式計(jì)算架構(gòu)對(duì)計(jì)算進(jìn)行拆分后分發(fā)到成百甚至上千臺(tái)機(jī)器上并行執(zhí)行。Flink的出現(xiàn)正好解決了大規(guī)模數(shù)據(jù)計(jì)算問題,相比于MapReduce[1]框架,F(xiàn)link[2]具有流批一體的數(shù)據(jù)處理語義[3]、基于線程的計(jì)算模型和中間結(jié)果無須寫入磁盤等優(yōu)點(diǎn)。Flink針對(duì)Table API[4]提供了執(zhí)行計(jì)劃優(yōu)化模塊,該模塊對(duì)作業(yè)執(zhí)行計(jì)劃優(yōu)化后生成相應(yīng)的物理執(zhí)行計(jì)劃,然后提交到集群運(yùn)行。該模塊提供了靈活的拓展接口,可以對(duì)Flink作業(yè)進(jìn)行自定義優(yōu)化。
在Flink分布式計(jì)算架構(gòu)下,執(zhí)行多表連接(multi-table join)操作時(shí)應(yīng)考慮以下兩個(gè)方面。
① 由于Flink提供了基于線程的輕量級(jí)計(jì)算模型,在集群中可以提供更高的計(jì)算并行度,而用戶編寫的程序在執(zhí)行多表連接時(shí)并不會(huì)考慮到表的大小及關(guān)聯(lián)性等特性。因此,本文需要用算法來優(yōu)化多表連接的并行度,從而提升作業(yè)的整體性能。
② 在連接過程中,需要進(jìn)行大量的數(shù)據(jù)shuffle操作以完成連接計(jì)算,導(dǎo)致過高的網(wǎng)絡(luò)IO代價(jià)。因此,本文需要設(shè)計(jì)一個(gè)算法在并行執(zhí)行多表連接時(shí)盡量減少需要進(jìn)行shuffle操作的數(shù)據(jù)量。
現(xiàn)有的多表連接的優(yōu)化研究主要圍繞MapReduce計(jì)算框架展開,優(yōu)化措施主要包括執(zhí)行計(jì)劃和執(zhí)行框架兩方面的優(yōu)化。由于Flink的性能優(yōu)化和編程模型的差異,已有算法不能充分利用Flink集群的性能優(yōu)勢(shì)。本文提出了一個(gè)適用于Flink的Multi Bushy Tree算法,用于提高多表連接的并行度。在多表連接過程中,已有算法主要致力于尋找最優(yōu)的連接順序以減小中間結(jié)果數(shù)據(jù)的大小,而忽略了利用集群的分布式特點(diǎn)盡可能將多表連接計(jì)算并行化。本文充分考慮了表之間關(guān)聯(lián)鍵的依賴關(guān)系,尋找局部星型連接,將不存在依賴關(guān)系的連接計(jì)算和星型連接中事實(shí)表和維表的連接計(jì)算并行化,從而縮短多表連接作業(yè)的執(zhí)行時(shí)間。
為了進(jìn)一步提高多表連接的速度,本文在Multi Bushy Tree算法的基礎(chǔ)上提出了Semi Join算法。該算法在執(zhí)行星型連接中事實(shí)表和維表的連接計(jì)算時(shí),只對(duì)事實(shí)表中的關(guān)聯(lián)鍵字段和維表數(shù)據(jù)執(zhí)行shuffle操作,連接得到的各中間結(jié)果表以事實(shí)表中的主鍵字段作為關(guān)聯(lián)條件,執(zhí)行連接計(jì)算。由于Flink提供的執(zhí)行計(jì)劃優(yōu)化功能[5],多個(gè)中間結(jié)果表的連接操作可以chain在一個(gè)節(jié)點(diǎn)執(zhí)行。整個(gè)連接過程需要shuffle操作的數(shù)據(jù)量大大減少,可以有效減小網(wǎng)絡(luò)IO代價(jià),提高星型連接的速度。
對(duì)于執(zhí)行計(jì)劃的優(yōu)化,文獻(xiàn)[6]的處理就是將多表連接看作是鏈?zhǔn)竭B接問題,通過將n張表的連接操作拆分成為n-1個(gè)2元連接,每個(gè)2元連接使用一個(gè)MapReduce作業(yè)完成,各MapReduce作業(yè)之間是串行執(zhí)行的。當(dāng)n較大時(shí)會(huì)導(dǎo)致較高的作業(yè)時(shí)間復(fù)雜度和中間結(jié)果的存儲(chǔ)代價(jià),不能充分利用分布式作業(yè)的優(yōu)勢(shì)。文獻(xiàn)[7]利用改進(jìn)的蟻群算法尋找連接樹的較優(yōu)解,在一定程度上避免了局部最優(yōu)解,同時(shí)縮短了搜索時(shí)間。文獻(xiàn)[8]提出的多表連接算法可以在一個(gè)MapReduce作業(yè)中完成所有的連接運(yùn)算,但是隨著參與連接的表增加,這種算法通過網(wǎng)絡(luò)IO進(jìn)行shuffle操作的數(shù)據(jù)量將急劇增加,從而導(dǎo)致較高的網(wǎng)絡(luò)IO代價(jià)。文獻(xiàn)[9]提出的SmartJoin算法雖然可以使用更少的MapReduce作業(yè)數(shù)來實(shí)現(xiàn)鏈?zhǔn)蕉啾磉B接不能完成的大量表之間的連接操作,但是該算法對(duì)表的限制太多,它要求參與連接的表必須包括兩個(gè)大表,其余小表必須存儲(chǔ)在執(zhí)行Reduce任務(wù)的節(jié)點(diǎn)上,參與連接的兩個(gè)大表還必須存在關(guān)聯(lián)鍵。文獻(xiàn)[10]將多表連接規(guī)模劃分為小中大三種類型,對(duì)于不同類型采用不同的連接順序優(yōu)化方法,并引入線性動(dòng)態(tài)規(guī)劃優(yōu)化算法的時(shí)間復(fù)雜度。上述所有算法都是通過優(yōu)化多表連接計(jì)算的連接順序來提高作業(yè)的運(yùn)行效率,沒有考慮將無依賴關(guān)系的連接計(jì)算并行化,無法充分利用Flink集群的節(jié)點(diǎn)資源。
對(duì)于執(zhí)行框架的優(yōu)化,文獻(xiàn)[11]基于MapReduce框架提出在Map-Reduce兩階段之后新增一個(gè)Merge階段,在新增的Merge階段執(zhí)行表連接計(jì)算,從而節(jié)省了一次MapReduce作業(yè)。文獻(xiàn)[12]通過修改MapReduce框架,支持?jǐn)?shù)據(jù)在各節(jié)點(diǎn)之間進(jìn)行管道式傳輸,支持在線聚集和持續(xù)查詢,但是框架修改之后,增加了失敗恢復(fù)(failover)實(shí)現(xiàn)的復(fù)雜度,同時(shí)對(duì)表連接的性能增益有限。文獻(xiàn)[13]將傳統(tǒng)的Sort-Merge Join算法應(yīng)用到大規(guī)模分布式系統(tǒng)中,突破了單機(jī)的內(nèi)存限制,同時(shí)避免了在reduce節(jié)點(diǎn)執(zhí)行兩表連接操作時(shí)進(jìn)行笛卡爾積的計(jì)算。文獻(xiàn)[14]通過計(jì)算得到的關(guān)聯(lián)鍵的布爾分布矩陣,在Map節(jié)點(diǎn)利用矩陣對(duì)表數(shù)據(jù)進(jìn)行過濾,可以有效減小網(wǎng)絡(luò)IO代價(jià),但是該算法對(duì)外連接增益有限而且增加了布爾分布矩陣的計(jì)算時(shí)間。文獻(xiàn)[15]利用廣播變量,在執(zhí)行兩表連接時(shí),將小表存放到廣播變量中,讀取大表數(shù)據(jù)的節(jié)點(diǎn)會(huì)從廣播變量中讀取小表中的全部數(shù)據(jù),進(jìn)而執(zhí)行連接計(jì)算,但該方式要求小表中的數(shù)據(jù)量足夠少,否則對(duì)小表采用復(fù)制廣播的方式依然會(huì)對(duì)網(wǎng)絡(luò)IO產(chǎn)生極大的壓力。Flink基于DataFlow的編程模型[16]與MapReduce實(shí)現(xiàn)存在較大差異,無法將已有執(zhí)行框架優(yōu)化算法直接應(yīng)用到Flink上。
論文主要的設(shè)計(jì)與優(yōu)化目標(biāo)是在使用Flink計(jì)算引擎執(zhí)行多表連接作業(yè)時(shí),提高作業(yè)的運(yùn)行效率。鑒于此,本文從兩個(gè)方面進(jìn)行考慮。
1) 在分布式集群中,計(jì)算的并行度對(duì)作業(yè)的運(yùn)行效率有很大影響,為提高多表連接計(jì)算的并行度,所提出Multi Bushy Tree算法,可以有效地將不存在依賴關(guān)系的連接計(jì)算并行執(zhí)行。
2) 對(duì)于星型連接,本文提出Semi Join算法,通過拆分事實(shí)表的關(guān)聯(lián)鍵的方式減少在連接計(jì)算時(shí)需要shuffle的數(shù)據(jù)量,以此來減小星型連接的網(wǎng)絡(luò)IO代價(jià)。
如圖1所示,多表連接可以用一個(gè)查詢圖G=(V,E)來表示,其中:V是圖中所有節(jié)點(diǎn)的集合,每個(gè)節(jié)點(diǎn)代表一個(gè)參與連接的數(shù)據(jù)表;E是圖中所有邊的集合,如果兩個(gè)節(jié)點(diǎn)之間存在連接邊表示這兩張數(shù)據(jù)表存在關(guān)聯(lián)關(guān)系。圖1(a)的查詢圖包含6個(gè)關(guān)聯(lián)關(guān)系,為6元連接。同理,圖1(b)的查詢圖為8元連接。
(a) Left-deep Tree(b) Right-deep Tree
(a) 6-table join (b) 8-table join圖1 多表連接查詢示例
當(dāng)表的數(shù)量較少時(shí),可以通過窮舉的方式得到一個(gè)最優(yōu)的連接順序。但是當(dāng)參與連接的表的數(shù)量超過一定大小時(shí),該問題則不能得到有效的解決。有研究表明多表連接執(zhí)行計(jì)劃的最優(yōu)解確定是一個(gè)NP-hard問題[18],傳統(tǒng)的做法通常采用連接樹模型限定解的一個(gè)子空間,并設(shè)計(jì)算法從生成的連接樹中找到一個(gè)較優(yōu)解從而確定連接的順序。連接樹可以描述不同的連接方案但最終都會(huì)得到相同的連接結(jié)果。如圖2所示,主流的連接樹模型有Left-deep Tree、Right-deep Tree、Zigzag Tree和Bushy Tree四種。
(c) Zigzag Tree (d) Zigzag Tree圖2 四種不同的連接樹
本文首先對(duì)連接樹中涉及到的概念進(jìn)行說明:用戶輸入的連接條件涉及到的數(shù)據(jù)表稱為基本表,兩個(gè)表進(jìn)行連接計(jì)算后得到的中間結(jié)果稱為中間表。連接樹是一棵二叉樹,葉子節(jié)點(diǎn)永遠(yuǎn)是基本表,內(nèi)部節(jié)點(diǎn)永遠(yuǎn)是中間表,邊表示數(shù)據(jù)的流向。對(duì)于Left-deep Tree、 Right-deep Tree和Zigzag Tree,其內(nèi)部節(jié)點(diǎn)至少有一個(gè)葉子節(jié)點(diǎn)作為子節(jié)點(diǎn),表示每個(gè)中間表都至少由一個(gè)基本表進(jìn)行連接計(jì)算得到。對(duì)于Bushy Tree,有些內(nèi)部節(jié)點(diǎn)的子節(jié)點(diǎn)是不包含葉子節(jié)點(diǎn)的。實(shí)際上其他三種連接樹都是Bushy Tree的一種特殊形式。
Flink提供的基于線程的輕量級(jí)計(jì)算模型,使得分布式計(jì)算作業(yè)能以更高的并行度運(yùn)行。在多表連接方向,目前已有的算法主要是通過減小中間表大小的方式提高作業(yè)的運(yùn)行效率,且都是針對(duì)MapReduce作業(yè),無法充分發(fā)揮Flink的并行計(jì)算優(yōu)勢(shì),因此并不完全適用于Flink計(jì)算作業(yè)。本文提出的算法充分考慮了Flink計(jì)算引擎的優(yōu)點(diǎn),盡量提高多表連接計(jì)算的運(yùn)行效率?;跇?gòu)建得到的較優(yōu)Bushy Tree,分析表之間的依賴關(guān)系,盡可能提高多表連接計(jì)算的并行度。多元連接樹(Multi Bushy Tree)算法詳細(xì)步驟見算法1。
算法1Multi Bushy Tree算法
輸入:JP“連接表的集合”。
輸出:PT“多元連接樹”。
1.BEGIN
2.bushyTree←BuildBushyTree(JP);
3.//將JP根據(jù)表之間的關(guān)聯(lián)關(guān)系構(gòu)建二元Bushy Tree
4.FOR EACHnodeINbushyTreeDO
5.IFshouldMerge(node) THEN
6.node←mergeNode(node);
7.END FOR
8.//如果節(jié)點(diǎn)的所有子節(jié)點(diǎn)(C1,C2,…,Cn)都是基本表且Cn
//基本表的關(guān)聯(lián)鍵來自于C1,合并所有子節(jié)點(diǎn)并替換該節(jié)點(diǎn)
9.FOR EACHnodeINbushyTreeDO
10.IFshouldExecute(node) THEN
11.IFisStarJoin(node) THEN
12.parallelExecuteStarJoin(node);
13.ELSE
14.ParallelExecuteNormalJoin(node);
15.ENDFOR
16.RETURNPT;
17.END
圖3 二元濃密連接樹示意圖
② 尋找局部星型連接。對(duì)于步驟①構(gòu)建得到的二元濃密連接樹,遍歷樹中的每一個(gè)節(jié)點(diǎn),如果該節(jié)點(diǎn)的右孩子節(jié)點(diǎn)表示的基本表中的關(guān)聯(lián)鍵均來自于最左孩子節(jié)點(diǎn)表示的基本表或者中間表,則用所有的孩子節(jié)點(diǎn)替換該節(jié)點(diǎn),將二叉樹變?yōu)槎嗖鏄?。例如,?duì)于步驟①中的查詢示例,在分支一中,連接表C時(shí),連接條件為“A.key2=C.key”,表C中的關(guān)聯(lián)鍵key同樣來自于表A中的key2列,滿足合并的要求,因此可以使用A、B兩個(gè)節(jié)點(diǎn)代替節(jié)點(diǎn)M1,更新后的連接樹見圖4(a)。針對(duì)分支二可以做相同處理,得到的連接樹如圖4(b)所示。而表A連接表D時(shí),由于表D和表A的連接是笛卡爾積操作,因此不滿足合并條件。對(duì)整個(gè)二元連接樹處理之后,得到的最終的多元濃密連接樹(Multi Bushy Tree),如圖4(b)所示。對(duì)于優(yōu)化得到的多元濃密連接樹,如果某個(gè)節(jié)點(diǎn)包含n(n>2)個(gè)子節(jié)點(diǎn),表明該中間結(jié)果表是表(T1,T2,…,Tn)通過星型連接運(yùn)算得到的,其中T1在星型連接中作為事實(shí)表,T2-Tn在星型連接中作為維表。如果某個(gè)節(jié)點(diǎn)只有兩個(gè)子節(jié)點(diǎn),表明該中間結(jié)果表是通過鏈?zhǔn)竭B接或者笛卡爾積運(yùn)算得到。
(a) (b)圖4 多元濃密連接樹
Multi Bushy Tree算法充分考慮表之間的關(guān)聯(lián)性,將不存在依賴關(guān)系的連接計(jì)算并行化,提升多表連接作業(yè)的整體性能。此外,Multi Bushy Tree算法在構(gòu)建的較優(yōu)連接樹基礎(chǔ)上,充分挖掘局部星型連接,并將星型連接中事實(shí)表和各個(gè)維表之間的連接計(jì)算并行化,極大地提高了多表連接計(jì)算的并行度。然而在星型連接中,仍然需要將事實(shí)表中的數(shù)據(jù)通過網(wǎng)絡(luò)IO進(jìn)行多次shuffle以完成和各維表的連接計(jì)算,這對(duì)星型連接的計(jì)算性能仍會(huì)有較大影響。因此,針對(duì)星型連接提出了一個(gè)全新的連接算法減小網(wǎng)絡(luò)IO代價(jià)。
這一部分中,針對(duì)星型連接,本文提出基于Flink計(jì)算引擎可以有效減少需要進(jìn)行shuffle的數(shù)據(jù)量的算法。由于在星型連接中,事實(shí)表通常作為表連接計(jì)算中的左表,此時(shí)如果事實(shí)表和多張數(shù)據(jù)表進(jìn)行連接計(jì)算時(shí)的關(guān)聯(lián)鍵相同,F(xiàn)link提供的執(zhí)行計(jì)劃優(yōu)化器就會(huì)把這些連接計(jì)算合并在一個(gè)節(jié)點(diǎn)運(yùn)行。利用這一特性,本文提出了關(guān)聯(lián)鍵拆分連接(Semi Join)算法,利用該算法對(duì)星型連接優(yōu)化后,只需要對(duì)事實(shí)表、維表和中間結(jié)果表各執(zhí)行一次shuffle操作,可以進(jìn)一步提高星型連接的執(zhí)行效率,減小網(wǎng)絡(luò)IO代價(jià)。詳細(xì)算法步驟見算法2。
算法2Semi Join算法
輸入:JP“星型連接樹”。
輸出:PT“星型連接結(jié)果”。
1.BEGIN
2.FORi=0 TOnumOf(dimTables) DO
3.joinCols←getJoinCols(dimTable);
4.localFactTables[i]←select(factTable,joinCols);
5.ENDFOR
6.//根據(jù)各維表的關(guān)聯(lián)鍵從事實(shí)表中選擇外鍵字段和主鍵
//字段生成局部事實(shí)表集合
7.FORi=0 TOnumOf(factTempTables) DO
8.dimResTables[i]←join(localFactTables[i],dimTables[i]);
9.ENDFOR
10.//對(duì)局部事實(shí)表和維表執(zhí)行連接計(jì)算,得到各維表的連接
//結(jié)果表集合
11.FORi=0 TOnumOf(dimResTables) DO
12.factTable←join(factTable,dimResTables[i]);
13.ENDFOR
14.//各維表的連接結(jié)果表和事實(shí)表以事實(shí)表的主鍵字段作為
//連接條件進(jìn)行連接計(jì)算,得到最終結(jié)果表
15.RETURNfactTable;
16.END
① 生成局部事實(shí)表。通過解析Flink執(zhí)行計(jì)劃,得到事實(shí)表中和維表關(guān)聯(lián)的外鍵字段。使用Flink提供的“select”函數(shù)選擇解析出的外鍵字段和事實(shí)表的主鍵字段作為局部事實(shí)表。例如,查詢命令“SELECT*FROM A,B WHERE A.fKey=B.key”,事實(shí)表A使用fKey字段和維表B進(jìn)行關(guān)聯(lián),因此生成的局部事實(shí)表T中包含兩個(gè)字段:fKey和key(事實(shí)表的主鍵)。由于Flink提供的執(zhí)行計(jì)劃優(yōu)化功能,select計(jì)算和前面的計(jì)算函數(shù)會(huì)合并在一個(gè)節(jié)點(diǎn)中執(zhí)行,因此不會(huì)產(chǎn)生數(shù)據(jù)shuffle。
② 生成各維表的連接結(jié)果表。在步驟①中為每個(gè)維表生成了連接需要的局部事實(shí)表,由于各局部事實(shí)表只包含了事實(shí)表中的用于連接指定維表的外鍵字段和事實(shí)表的主鍵字段,因此每個(gè)局部事實(shí)表的數(shù)據(jù)量都比較小。將生成的局部事實(shí)表和維表按照關(guān)聯(lián)字段進(jìn)行hash shuffle,關(guān)聯(lián)字段相等的數(shù)據(jù)會(huì)通過網(wǎng)絡(luò)IO發(fā)送到相同節(jié)點(diǎn)執(zhí)行連接計(jì)算。由于局部事實(shí)表和維表數(shù)據(jù)量都比較小,采用hash shuffle的方式可以避免將局部事實(shí)表進(jìn)行復(fù)制廣播的開銷,有效減少了需要進(jìn)行shuffle的數(shù)據(jù)量。以步驟①的查詢?yōu)槔?,局部事?shí)表T(key,fKey)和維表D(key,value1,value2),執(zhí)行連接計(jì)算時(shí),表T計(jì)算fKey字段的hash值,對(duì)并行度取模之后發(fā)送到指定節(jié)點(diǎn),表D對(duì)key字段hash取模后發(fā)送到指定節(jié)點(diǎn),表T中fKey和表D中key字段相同的數(shù)據(jù)會(huì)發(fā)送到同一個(gè)節(jié)點(diǎn),這種連接方式保證表中的一條數(shù)據(jù)只會(huì)進(jìn)行一次網(wǎng)絡(luò)IO。
③ 生成最終查詢結(jié)果表。通過步驟(2)得到的各維表的連接結(jié)果表中除了包含各維表中的查詢字段還包含了事實(shí)表中的主鍵字段,事實(shí)表和各連接結(jié)果表通過事實(shí)表中的主鍵執(zhí)行鏈?zhǔn)竭B接計(jì)算從而得到最終的查詢結(jié)果。由于事實(shí)表和各連接結(jié)果表的鏈?zhǔn)竭B接計(jì)算中所有連接的關(guān)聯(lián)鍵都相同,F(xiàn)link提供的執(zhí)行計(jì)劃優(yōu)化功能會(huì)將這些連接計(jì)算chain在一個(gè)節(jié)點(diǎn)中執(zhí)行,所以在本次鏈?zhǔn)竭B接中事實(shí)表和各連接結(jié)果表只需要執(zhí)行一次數(shù)據(jù)shuffle操作。
圖5給出了基于Flink的星型連接優(yōu)化算法Semi Join的執(zhí)行流程。
圖5 星型連接示意圖
利用Semi Join算法優(yōu)化之后的星型連接計(jì)算,步驟①、步驟②和步驟③是串行的,而步驟②中局部事實(shí)表和各維表之間的連接可以并行計(jì)算,因此星型連接的時(shí)間代價(jià)計(jì)算式表示為:
CostlocalFact=Max(C1,C2,…,Cn)
CostdimJoin=Max(R1,R2,…,Rn)
(1)
Cost=CostlocalFact+CostdimJoin+CostfinalJoin
式中:C1,C2,…,Cn表示從事實(shí)表中查詢得到與各維表進(jìn)行連接的局部事實(shí)表的時(shí)間代價(jià),由于SELECT計(jì)算是并行執(zhí)行的,所以該階段時(shí)間代價(jià)取決于最慢的SELECT計(jì)算。R1,R2,…,Rn表示各局部事實(shí)表和各維表連接計(jì)算的時(shí)間代價(jià),各連接計(jì)算在集群中并行執(zhí)行,該階段的時(shí)間代價(jià)同樣取決于最慢的連接計(jì)算的時(shí)間代價(jià)。最后整個(gè)星型連接計(jì)算的時(shí)間代價(jià)為三個(gè)串行計(jì)算的時(shí)間代價(jià)之和。
從圖5可以看出,基于Flink的星型連接優(yōu)化算法Semi Join主要包含兩種類型的連接操作:局部事實(shí)表-維表連接和事實(shí)表-結(jié)果表連接。出于對(duì)計(jì)算性能和存儲(chǔ)性能考慮,算法中的連接計(jì)算采用Hash Join的方式,每次連接都會(huì)進(jìn)行數(shù)據(jù)shuffle操作,因此網(wǎng)絡(luò)IO代價(jià)計(jì)算式表示為:
(2)
Cost=CostdimJoin+CostfinalJoin
式中:L[i]表示第i個(gè)局部事實(shí)表的大小;D[i]表示第i個(gè)維表的大??;R[i]表示第i個(gè)連接結(jié)果表的大?。籉表示事實(shí)表的大小。生成連接結(jié)果表時(shí),需要對(duì)參與連接的局部事實(shí)表和維表進(jìn)行shuffle操作。生成最終結(jié)果表時(shí),需要對(duì)事實(shí)表和各連接結(jié)果表進(jìn)行shuffle操作。整個(gè)星型連接需要shuffle操作的總數(shù)據(jù)量為兩個(gè)階段shuffle操作數(shù)據(jù)量之和。
本文將Multi Bushy Tree+Semi Join與其他兩種多表連接算法進(jìn)行比較。(1) Left-deep Tree連接。Flink提供了基于Left-deep Tree的多表連接方式,連接的順序取決于用戶輸入。(2) Bushy Tree連接。通過Bushy Tree的方式構(gòu)建連接樹,F(xiàn)link可以將不存在依賴關(guān)系的兩表連接計(jì)算并行化。本文實(shí)現(xiàn)了這3種連接算法,并從計(jì)算并行度、作業(yè)運(yùn)行時(shí)間和網(wǎng)絡(luò)IO三個(gè)方面進(jìn)行評(píng)估。
本文搭建了具有20個(gè)節(jié)點(diǎn)的集群,其中一個(gè)節(jié)點(diǎn)被用作Master和ResourceManager,負(fù)責(zé)任務(wù)調(diào)度和資源管理,另外的19個(gè)節(jié)點(diǎn)被用作Worker和TaskManager,所有的計(jì)算任務(wù)都在這些節(jié)點(diǎn)上運(yùn)行。每臺(tái)機(jī)器的硬件配置為4核2.4 GHz的CPU、8 GB內(nèi)存、40 GB機(jī)械硬盤。集群中的每個(gè)節(jié)點(diǎn)均安裝了CentOS 7 64位操作系統(tǒng),使用的Flink版本為原生的Flink 1.9.0,底層使用YARN 2.9.2作為分布式調(diào)度系統(tǒng)。
實(shí)驗(yàn)采用的數(shù)據(jù)集為TPC-H提供的Dbgen工具所生成的模擬數(shù)據(jù)集。為了評(píng)價(jià)三種算法在不同的多表連接方式下的性能差異,實(shí)驗(yàn)生成了A、B、C和D四種數(shù)據(jù)集,數(shù)據(jù)集格式和大小如表1所示,四種數(shù)據(jù)集基本可以覆蓋多表連接的各種情況。
表1 數(shù)據(jù)集格式
在分布式集群中,提高多表連接計(jì)算的并行度可以有效地縮短計(jì)算作業(yè)的運(yùn)行時(shí)間。由于多表連接計(jì)算根據(jù)依賴關(guān)系可以劃分為不同的計(jì)算階段,每個(gè)計(jì)算階段的并行度并不相等,因此采用各階段的平均并行度作為衡量標(biāo)準(zhǔn),設(shè)置每個(gè)兩表連接計(jì)算的并行度為4。表2展示了不同連接算法在執(zhí)行多表連接計(jì)算時(shí)的并行度。
表2 多表連接并行度
對(duì)于數(shù)據(jù)集A,由于表之間不存在局部星型連接,Multi Bushy Tree算法無法在Bushy Tree的基礎(chǔ)上做進(jìn)一步優(yōu)化。在數(shù)據(jù)集B中,Multi Bushy Tree算法可以基于Bushy Tree算法進(jìn)一步并行執(zhí)行局部星型連接中事實(shí)表和維表的連接計(jì)算,所以并行度進(jìn)一步提高。在數(shù)據(jù)集C中,雖然存在兩個(gè)局部星型連接,但是兩者存在關(guān)聯(lián)依賴關(guān)系,無法進(jìn)一步并行化兩個(gè)星型連接計(jì)算,結(jié)果表明Multi Bushy Tree算法的平均并行度仍然高于Bushy Tree算法優(yōu)化后的并行度。在數(shù)據(jù)集D中,由于Multi Bushy Tree算法可以并行執(zhí)行兩個(gè)局部星型連接和星型連接內(nèi)部事實(shí)表和維表的連接計(jì)算,平均并行度進(jìn)一步提高。由于Left-deep Tree算法只能串行執(zhí)行多個(gè)兩表連接計(jì)算,因此在四個(gè)數(shù)據(jù)集中,Multi Bushy Tree和Bushy Tree算法的并行度均高于Left-deep Tree算法。
實(shí)驗(yàn)表明,Multi Bushy Tree算法在多表連接存在局部星型連接的情況下,可以有效提高連接計(jì)算的并行度,并且隨著表數(shù)量的增加,并行度增加更明顯。對(duì)于不存在局部星型連接的情況,利用二元濃密連接樹仍可并行執(zhí)行不存在依賴關(guān)系的兩表連接計(jì)算。
圖6展示了對(duì)于不同數(shù)據(jù)集,每種多表連接算法整體作業(yè)的執(zhí)行時(shí)間。由于在Left-deep Tree算法中,多表連接計(jì)算只能拆分為n-1個(gè)串行執(zhí)行的兩表連接計(jì)算,且有可能較早產(chǎn)生笛卡爾積計(jì)算,整體作業(yè)的執(zhí)行時(shí)間比其他兩種算法高很多。在剩下的兩種多表連接算法中,Multi Bushy Tree算法比Bushy Tree算法的作業(yè)運(yùn)行時(shí)間低很多。主要是因?yàn)殡m然Multi Bushy Tree+Semi Join花費(fèi)了不少時(shí)間用于尋找局部星型連接和生成局部事實(shí)表,但實(shí)驗(yàn)結(jié)果表明提高多表連接計(jì)算的并行度可以極大降低連接作業(yè)的運(yùn)行時(shí)間。從整體上看,Multi Bushy Tree通過尋找局部星型連接并并行化星型連接中事實(shí)表和維表之間的連接計(jì)算,從而降低作業(yè)的運(yùn)行時(shí)間,而Semi Join算法減少了星型連接過程中網(wǎng)絡(luò)IO時(shí)間。
圖6 不同多表連接算法在不同數(shù)據(jù)集下的作業(yè)運(yùn)行時(shí)間
實(shí)驗(yàn)表明,在四種類型的數(shù)據(jù)集中,Multi Bushy Tree算法均表現(xiàn)最好。相比于其他兩種連接算法,在數(shù)據(jù)集A上優(yōu)化效果最差,作業(yè)運(yùn)行時(shí)間縮短0%和20.80%,在數(shù)據(jù)集D上優(yōu)化效果最好,作業(yè)運(yùn)行時(shí)間縮短14.98%和44.78%。在所有數(shù)據(jù)集上,Multi Bushy Tree+Semi Join算法表現(xiàn)最好。
在星型連接中,Semi Join算法通過選擇與各維表進(jìn)行連接的關(guān)聯(lián)鍵字段和事實(shí)表的主鍵字段得到局部事實(shí)表,使用局部事實(shí)表和維表進(jìn)行連接計(jì)算得到各維表的連接結(jié)果表,最后將多個(gè)連接結(jié)果表通過事實(shí)表的主鍵字段進(jìn)行連接計(jì)算得到最終查詢結(jié)果表的方式,避免了對(duì)事實(shí)表進(jìn)行多次shuffle的操作,能夠極大減少通過網(wǎng)絡(luò)IO進(jìn)行shuffle操作的數(shù)據(jù)量。實(shí)驗(yàn)使用數(shù)據(jù)生成工具Dbgen生成了三個(gè)只包含星型連接的數(shù)據(jù)集Q1、Q2和Q3,除了各數(shù)據(jù)集中均包含一張事實(shí)表外,Q1中包含3張維表,Q2中包含5張維表,Q3中包含10張維表。在三個(gè)數(shù)據(jù)集上分別比較Semi Join算法和Flink提供的級(jí)聯(lián)連接算法為完成連接所產(chǎn)生的網(wǎng)絡(luò)IO代價(jià)。實(shí)驗(yàn)結(jié)果表明,Semi Join算法可以有效減少進(jìn)行shuffle的數(shù)據(jù)量。
根據(jù)圖7中的數(shù)據(jù)可知,在星型連接中,維表的數(shù)量越多,Semi Join算法對(duì)于網(wǎng)絡(luò)IO的優(yōu)化效果越明顯。在三個(gè)數(shù)據(jù)集中,網(wǎng)絡(luò)IO的數(shù)據(jù)量分別減少了65.4%、77.6%和89.8%。主要有以下兩個(gè)原因。
圖7 Semi Join和Chain Join產(chǎn)生的網(wǎng)絡(luò)IO數(shù)據(jù)量
(1) Semi Join算法通過生成局部事實(shí)表的方式,使得在和各維表連接時(shí),將對(duì)整個(gè)事實(shí)表shuffle操作的需求轉(zhuǎn)化為對(duì)各局部事實(shí)表的shuffle操作。
(2) 生成最終查詢結(jié)果表時(shí),由于各連接結(jié)果表都是通過事實(shí)表的主鍵進(jìn)行關(guān)聯(lián)的,因此所有的連接計(jì)算都可以chain在一個(gè)節(jié)點(diǎn)運(yùn)行,只需要對(duì)事實(shí)表進(jìn)行一次shuffle操作。
綜上所述,雖然Semi Join算法增加了生成局部事實(shí)表的時(shí)間開銷,但是可以顯著減小需要通過網(wǎng)絡(luò)IO進(jìn)行shuffle操作的數(shù)據(jù)量,可以有效縮短整體作業(yè)的運(yùn)行時(shí)間,且算法對(duì)維表的數(shù)量不敏感。所以Semi Join算法在減少網(wǎng)絡(luò)IO的同時(shí)縮短了星型連接作業(yè)的運(yùn)行時(shí)間。
基于Flink分布式計(jì)算引擎,本文為優(yōu)化多表連接的計(jì)算速度提出優(yōu)化計(jì)算并行度的Multi Bushy Tree算法和Semi Join算法。Multi Bushy Tree算法在二元濃密樹的基礎(chǔ)上,充分考慮了表之間的關(guān)聯(lián)性,在連接樹中尋找局部星型連接,通過并行化不存在依賴關(guān)系的連接計(jì)算和星型連接中事實(shí)表和各維表的連接計(jì)算縮短多表連接作業(yè)的運(yùn)行時(shí)間。此外,Semi Join算法利用Flink提供的執(zhí)行計(jì)劃優(yōu)化功能,大量減少星型連接中需要通過網(wǎng)絡(luò)IO進(jìn)行shuffle操作的數(shù)據(jù)量。實(shí)驗(yàn)結(jié)果表明,與其他連接方法相比,Multi Bushy Tree+Semi Join算法可以大幅度提高多表連接時(shí)的計(jì)算并行度,極大縮短了多表連接作業(yè)的運(yùn)行時(shí)間,有效減小網(wǎng)絡(luò)IO代價(jià)。
由于目前在分布式計(jì)算領(lǐng)域有多種計(jì)算框架,例如Spark、Beam等,都可以提供大規(guī)模分布式集群計(jì)算。在未來的工作中,我們會(huì)將Multi Bushy Tree+Semi Join的多表連接優(yōu)化方案應(yīng)用到更多的計(jì)算框架中,并進(jìn)一步提升算法的性能。