卞昊穹, 陳躍國(guó), 杜小勇, 高彥杰
(1.數(shù)據(jù)工程與知識(shí)工程教育部重點(diǎn)實(shí)驗(yàn)室(中國(guó)人民大學(xué));2.中國(guó)人民大學(xué) 信息學(xué)院,北京 100872)
Spark是繼Hadoop之后出現(xiàn)的通用高性能并行計(jì)算平臺(tái),應(yīng)用于大規(guī)模的分布式數(shù)據(jù)處理.在存儲(chǔ)方面,Spark采用了基于分布式共享內(nèi)存的彈性分布式數(shù)據(jù)集(Resilient Distributed Datasets,RDD)[1]作為數(shù)據(jù)結(jié)構(gòu),同時(shí)兼容HDFS,可將HDFS中已經(jīng)存在的大量現(xiàn)有數(shù)據(jù)作為數(shù)據(jù)源加載到RDD中進(jìn)行處理,并將必要的數(shù)據(jù)在內(nèi)存中進(jìn)行容錯(cuò)的緩存以支持高性能的迭代計(jì)算.在計(jì)算模型方面,Spark與MapReduce相似,但更加靈活,在RDD上提供了更加豐富的操作符及操作符間的組合方式.
由于Spark的高可擴(kuò)展性和較高的數(shù)據(jù)處理性能[2],在大規(guī)模SQL查詢分析正在被研究和應(yīng)用.目前基于Spark的開源SQL查詢分析系統(tǒng)有Shark[3]和Spark SQL[4].在存儲(chǔ)方面,無(wú)論HDFS還是Spark RDD,為了實(shí)現(xiàn)良好的可擴(kuò)展性,都采用了較簡(jiǎn)單的存儲(chǔ)模型,將數(shù)據(jù)以大數(shù)據(jù)塊的方式分布式存儲(chǔ)在集群的各個(gè)節(jié)點(diǎn)上,不支持傳統(tǒng)并行數(shù)據(jù)倉(cāng)庫(kù)中的數(shù)據(jù)預(yù)劃分和數(shù)據(jù)索引;同時(shí)為了簡(jiǎn)化應(yīng)用程序開發(fā),Spark與Hadoop等平臺(tái)的存儲(chǔ)分布性對(duì)上層應(yīng)用透明,這使得基于這類平臺(tái)的數(shù)據(jù)分析系統(tǒng)亦無(wú)法干預(yù)數(shù)據(jù)的分布.
等值連接是結(jié)構(gòu)化數(shù)據(jù)分析中最為常見、代價(jià)最高的操作之一,在傳統(tǒng)并行分析型數(shù)據(jù)庫(kù)中對(duì)等值連接操作的優(yōu)化大多基于數(shù)據(jù)預(yù)劃分,無(wú)法在Spark中實(shí)現(xiàn).在Shark和Spark SQL中使用最多是Broadcast Join和Repartition Join.Broadcast Join局限性大,網(wǎng)絡(luò)和內(nèi)存開銷大,計(jì)算復(fù)雜度較高;Repartition Join則要求在查詢執(zhí)行時(shí)對(duì)參與連接的兩表作重新劃分,網(wǎng)絡(luò)和內(nèi)存開銷也很大.在結(jié)構(gòu)化大數(shù)據(jù)分析中,事實(shí)表和維表的數(shù)據(jù)量可能都很大,為了提高數(shù)據(jù)分析的實(shí)時(shí)性,需要對(duì)現(xiàn)有的Spark上的等值連接算法進(jìn)行優(yōu)化.
本文針對(duì)大數(shù)據(jù)分析中事實(shí)表與維表,尤其是大維表的等值連接提出了基于Spark優(yōu)化的連接算法.該算法首先將事實(shí)表在連接屬性上進(jìn)行以數(shù)據(jù)塊為單位的去重,然后將按塊去重后的事實(shí)表與維表進(jìn)行預(yù)連接,預(yù)連接通過將去重后的事實(shí)表和維表基于一致性哈希的原理進(jìn)行劃分,使之在集群中并行完成,避免了可能的數(shù)據(jù)傾斜問題,連接結(jié)果與事實(shí)表再進(jìn)行一次組裝得到最終連接結(jié)果.經(jīng)過理論分析和實(shí)驗(yàn)驗(yàn)證,本文算法較目前Spark上性能最好的SQL數(shù)據(jù)分析系統(tǒng)Spark SQL[3]的連接性能提高了1-2倍,且具有很好的可擴(kuò)展性.
基于Spark/MapReduce的大數(shù)據(jù)分析中,常用的連接算法有Simi-Join及其變種(如Per-split Simi-Join)、Broadcast Join、Repartition Join 等[5-6].其 中 Simi-Join 和 Broadcast Join局限性較大,通常性能較差,Repartition Join適用性最好,在絕大多數(shù)情況下具有最好的性能.但Repartition Join在Spark上具有如下的缺點(diǎn).
(1)需要在查詢時(shí)對(duì)數(shù)據(jù)進(jìn)行動(dòng)態(tài)的重劃分,通信量較大,尤其在寬表的情況下;
(2)通過Hash函數(shù)劃分到同一節(jié)點(diǎn)的很多事實(shí)表元組在外鍵上具有相同值,內(nèi)存和計(jì)算資源消耗較大.
針對(duì)以上問題,本文提出了一種對(duì)現(xiàn)有的Spark上的連接的優(yōu)化算法.本文中所用到的符號(hào)描述如表1.
定義1 連接并行度:連接時(shí)連接屬性哈希分區(qū)的個(gè)數(shù).
算法的執(zhí)行流程包括如下兩個(gè)階段.
階段一:動(dòng)態(tài)優(yōu)化與預(yù)連接
表1 符號(hào)列表Tab.1 Frequently used symbols
該階段根據(jù)Fact和Dim的元組數(shù)量計(jì)算出連接的并行度Pd,并計(jì)算出Fact的連接屬性值Key的無(wú)重集FactUK,F(xiàn)actUK中的每一個(gè)元素除了包含Key外,還記錄了Key對(duì)應(yīng)于Fact元組的存儲(chǔ)位置.將FactUK與Dim根據(jù)并行度Pd進(jìn)行哈希連接,得到結(jié)果JoinedUK,這一過程為預(yù)連接.由于FactUK與Dim 的數(shù)據(jù)量都遠(yuǎn)小于Fact,因此預(yù)連接的網(wǎng)絡(luò)I/O、內(nèi)存開銷和CPU開銷遠(yuǎn)小于Fact和Dim的Hash連接.本文中預(yù)連接所用的哈希函數(shù)借鑒了Dynamo、Cassandra等NoSQL數(shù)據(jù)庫(kù)中用于數(shù)據(jù)分布的一致性哈希算法的思想[7-8],避免了現(xiàn)有系統(tǒng)中所采用的簡(jiǎn)單哈希函數(shù)受到數(shù)據(jù)傾斜影響較大、造成連接時(shí)負(fù)載不均衡的問題,且查詢時(shí)動(dòng)態(tài)計(jì)算出的并行度更加合理,使得預(yù)連接的效率得到了進(jìn)一步的提高.JoinedUK中同樣包含Key對(duì)應(yīng)于Fact元組的存儲(chǔ)位置.
階段二:組裝連接結(jié)果
該階段根據(jù)JoinedUK中記錄的Fact中元組的存儲(chǔ)位置將JoinedUK與Fact進(jìn)行組裝,得到最終的連接結(jié)果.這一階段無(wú)須再進(jìn)行網(wǎng)絡(luò)I/O和內(nèi)存、CPU開銷巨大的Hash連接.JoinedUK的數(shù)據(jù)量不大于FactUK,在部分實(shí)際的查詢中,JoinedUK的數(shù)據(jù)量遠(yuǎn)小于FactUK,這一階段僅需將JoinedUK根據(jù)Fact中元組的存儲(chǔ)位置與Fact進(jìn)行組裝,代價(jià)很小.
本文的等值連接算法在Spark中可以高效地實(shí)現(xiàn).下面對(duì)算法各階段中的一些過程作細(xì)節(jié)描述.
Fact中的元組需要在連接屬性Key上進(jìn)行去重(投影).但是由于Fact是分布式存儲(chǔ)在集群中的,需要對(duì)Fact進(jìn)行shuffle和reduce才能完成去重,F(xiàn)act數(shù)據(jù)量巨大,這樣去重的代價(jià)很高.實(shí)驗(yàn)分析發(fā)現(xiàn),由于實(shí)際數(shù)據(jù)分析中采用的數(shù)據(jù)塊通常較大(64MB整數(shù)倍),塊內(nèi)包含的元組通常很多,加之?dāng)?shù)據(jù)的分布通常具有一定的局部性,因此重復(fù)數(shù)據(jù)以數(shù)據(jù)塊為單位進(jìn)行去重也可以達(dá)到較好的效果.所以本算法的去重分為兩個(gè)階段,第一階段先以數(shù)據(jù)塊為單位進(jìn)行去重,其結(jié)果參與預(yù)連接,在預(yù)連接的過程中,重復(fù)的連接數(shù)據(jù)會(huì)劃分到相同的分區(qū)中,從而進(jìn)一步去重.Spark的mapPartitions操作[2]用于實(shí)現(xiàn)以數(shù)據(jù)塊為粒度的計(jì)算,可用于實(shí)現(xiàn)數(shù)據(jù)塊級(jí)別的去重.
連接并行度是Repartition Join的一項(xiàng)重要的參數(shù),該參數(shù)設(shè)置過大或過小都會(huì)一定程度地影響連接的性能.研究者在Shark的研究實(shí)現(xiàn)中提出了動(dòng)態(tài)的參數(shù)優(yōu)化[9],在Hive等SQL-on-Hadoop系統(tǒng)中,查詢優(yōu)化是查詢執(zhí)行之前基于數(shù)據(jù)的靜態(tài)統(tǒng)計(jì)信息進(jìn)行的[10],Shark中在查詢運(yùn)行時(shí)的動(dòng)態(tài)參數(shù)調(diào)整給Shark帶來(lái)了性能的提升.本文中借鑒了動(dòng)態(tài)優(yōu)化的思想,將連接并行度的設(shè)定在連接過程中完成.因?yàn)樵谶B接執(zhí)行之前難以估計(jì)參與連接的實(shí)際數(shù)據(jù)量,在查詢執(zhí)行前的靜態(tài)查詢優(yōu)化階段難以準(zhǔn)確地設(shè)定這一參數(shù).本文中的連接并行度通過式(1)所示的函數(shù)getPd計(jì)算得出.
Pd=getPd(FactUK,Dim)= min(FactUK.partitionNum+Dim.partitionNum,ω)(1)其中,partitionNum表示數(shù)據(jù)塊的個(gè)數(shù),ω用于控制并行度的最大值,在實(shí)際的數(shù)據(jù)分析系統(tǒng)中,往往有多種數(shù)據(jù)分析和處理負(fù)載共享系統(tǒng)中的資源,控制并行度可以防止當(dāng)連接操作對(duì)資源的過度侵占.實(shí)驗(yàn)發(fā)現(xiàn),當(dāng)集群中沒有其他負(fù)載時(shí),ω取分配給Spark集群的CPU核數(shù)的2倍左右可以得到最好的連接性能;在Spark中數(shù)據(jù)集(RDD)的partitionNum可以快速獲取.
經(jīng)過連接屬性上的去重之后,F(xiàn)actUK中的元素個(gè)數(shù)和數(shù)據(jù)量都遠(yuǎn)小于Fact,但FactUK和Dim的數(shù)據(jù)量仍然可能超過單機(jī)處理能力,且仍可能存在數(shù)據(jù)傾斜.在預(yù)連接中,本文借鑒了一致性哈希[7]的思想對(duì)FactUK和Dim進(jìn)行劃分,具體流程為如下.
(1)對(duì)FactUK和Dim中的Key取并集(不去重),并根據(jù)采樣率φ進(jìn)行采樣,得到樣本sampleUK.
(2)計(jì)算sampleUK中Key的Hash值(如果Key為整型,Hash值可以是其本身),并將Hash空間劃分為Pd個(gè)區(qū)間,保證每個(gè)區(qū)間中的樣本數(shù)基本相等.
(3)將FactUK和Dim按照(2)中計(jì)算出的區(qū)間Hash劃分到各個(gè)區(qū)間中,這些區(qū)間的物理位置分布在集群的多個(gè)節(jié)點(diǎn)上.
Spark中支持RDD上的union和sample[2].其中union是兩個(gè)元素類型相同的RDD上的并操作,該操作并不會(huì)移動(dòng)兩個(gè)RDD的數(shù)據(jù),也不會(huì)進(jìn)行去重,只是將RDD的元數(shù)據(jù)合并;sample是RDD上的采樣操作,在各個(gè)partition上并行進(jìn)行.這兩個(gè)操作代價(jià)都很低,可快速完成(1);而利用Spark中的 RangePartitioner[2]可以快速完成(2)和(3).
FactUK和Dim一致性哈希劃分之后,相關(guān)的集群節(jié)點(diǎn)并行地對(duì)各個(gè)分片進(jìn)行連接,得到連接結(jié)果JoinedUK,其中每個(gè)元素包含了一個(gè)Key和Dim中對(duì)應(yīng)于該Key的元組,以及該Key對(duì)應(yīng)的FactRdd分區(qū)號(hào)的列表.由于劃分后的數(shù)據(jù)緩存在各個(gè)集群節(jié)點(diǎn)的內(nèi)存中,且充分利用了多節(jié)點(diǎn)和節(jié)點(diǎn)上多核的并行計(jì)算能力,預(yù)連接的速度很快.預(yù)連接的目的是進(jìn)一步去重并排除沒有連接結(jié)果的Key.在實(shí)際的大規(guī)模數(shù)據(jù)分析中,相當(dāng)一部分表連接查詢中,都存在大量的Key上沒有連接結(jié)果,浪費(fèi)了大量網(wǎng)絡(luò)I/O和計(jì)算資源.
Spark中的cogroup操作可以實(shí)現(xiàn)將兩個(gè)數(shù)據(jù)集按照Key快速分組,對(duì)分組結(jié)果集作簡(jiǎn)單的過濾操作即可保留有連接的Key,完成預(yù)連接.
預(yù)連接結(jié)果JoinedUK和Fact再做一次連接即可得到最終的連接結(jié)果.但這樣需要廣播JoinedUK,代價(jià)較高.本文根據(jù)JoinedUK中包含的Key對(duì)應(yīng)的Fact的分區(qū)號(hào),將JoinedUK按照Fact的分區(qū)號(hào)做重新的劃分,由于JoinedUK的數(shù)據(jù)量較小,劃分代價(jià)較低.劃分后的結(jié)果和Fact具有相同的分區(qū)數(shù)且各分區(qū)和Fact一一對(duì)應(yīng),因此組裝的過程以分區(qū)為粒度在集群中并行完成,效率很高.組裝完成后即得到最終的連接結(jié)果,可繼續(xù)在連接結(jié)果上完成查詢計(jì)劃中的其他操作.
在Spark中通過自定義的Partitioner可以快速將JoinedUK按照Fact的分區(qū)號(hào)進(jìn)行劃分,劃分得到的結(jié)果集具有和Fact相同的分區(qū)(數(shù)據(jù)塊)數(shù),之后通過Spark中的zipPartitions操作將Fact和JoinedUK做快速的組裝,這一過程無(wú)需在網(wǎng)絡(luò)上傳輸Fact中的數(shù)據(jù).
Spark是基于分布式共享內(nèi)存的分布式計(jì)算平臺(tái),在數(shù)據(jù)的處理過程中,數(shù)據(jù)可以始終保持在內(nèi)存中.本文假設(shè)參與連接的事實(shí)表和維表上的中間結(jié)果數(shù)據(jù)可以被緩存在內(nèi)存中,因此,連接的代價(jià)主要來(lái)自網(wǎng)絡(luò)I/O、內(nèi)存空間占用和CPU計(jì)算,這也是內(nèi)存計(jì)算中性能考量的三個(gè)主要方面.
本文的代價(jià)模型從網(wǎng)絡(luò)I/O代價(jià)、計(jì)算復(fù)雜度、內(nèi)存空間代價(jià)三個(gè)方面建立的.
1.網(wǎng)絡(luò)I/O代價(jià)
本文的連接算法中,事實(shí)表去重和計(jì)算連接并行度過程中幾乎沒有網(wǎng)絡(luò)I/O的開銷,網(wǎng)絡(luò)I/O開銷來(lái)自預(yù)連接過程的數(shù)據(jù)劃分和連接結(jié)果的組裝,如式(2)所示.
其中NetCostjp是預(yù)連接數(shù)據(jù)劃分(RangePartition)的網(wǎng)絡(luò)I/O代價(jià),用數(shù)據(jù)量表示為式(3).
由于去重后的FactUK中元組個(gè)數(shù)少于Fact,且每個(gè)元組僅包含一個(gè)Key值及其對(duì)應(yīng)的分區(qū)號(hào),因此FactUK的數(shù)據(jù)量遠(yuǎn)小于Fact,即Size(FactUK)=α·Size(Fact),0<α?1.Dim的數(shù)據(jù)量也遠(yuǎn)小于Fact,即Size(Dim)=β·Size(Fact),0<β?1.
NetCosta是結(jié)果組裝的網(wǎng)絡(luò)I/O代價(jià),其中包括將預(yù)連接進(jìn)行一次劃分和在組裝過程中跨節(jié)點(diǎn)讀取對(duì)應(yīng)分區(qū)的通信代價(jià),而這兩個(gè)子過程的通信代價(jià)不會(huì)超過NetCostjp,通常由于預(yù)連接之后相當(dāng)于對(duì)Fact在連接屬性上做了全局的去重且排除了不能連接的元組,NetCosta會(huì)遠(yuǎn)低于NetCostjp.假設(shè)參與連接的節(jié)點(diǎn)數(shù)為N,則總的通信量將由這些節(jié)點(diǎn)分?jǐn)偅虼吮疚乃惴ǖ木W(wǎng)絡(luò)I/O代價(jià)估算結(jié)果如式(4)所示.
2.計(jì)算復(fù)雜度
算法中計(jì)算分區(qū)、Hash探索、判斷等值等基本運(yùn)算的執(zhí)行次數(shù)都是和數(shù)據(jù)規(guī)模呈線性關(guān)系的.假設(shè)Fact中的元組數(shù)為n,Dim中的元組數(shù)為m,由于n?m,則算法的計(jì)算復(fù)雜度為O(n).
3.內(nèi)存空間代價(jià)
算法中,除了事先緩存在內(nèi)存中的參與連接的數(shù)據(jù)外,還需要緩存一些中間結(jié)果以加快計(jì)算,且連接過程中,需要將參與連接的數(shù)據(jù)在內(nèi)存中建立Hash表、進(jìn)行哈希探索,故需要一定的內(nèi)存空間消耗.
由于FactUK需要用于采樣以確定預(yù)連接時(shí)的Hash空間劃分并且要參與預(yù)連接,在Spark中對(duì)多次使用的數(shù)據(jù)進(jìn)行緩存可以提高計(jì)算的效率,因此在算法實(shí)現(xiàn)中對(duì)FactUK進(jìn)行了緩存.此外在預(yù)連接中需要將劃分后的FactUK和Dim在內(nèi)存中建立Hash表,完成連接.雖然這兩個(gè)過程是依次進(jìn)行的,但由于Spark的機(jī)制問題,在預(yù)連接開始之前,被緩存的FactUK數(shù)據(jù)難以被及時(shí)釋放,故算法的內(nèi)存代價(jià)應(yīng)為兩者之和.FactUK通常比Fact小很多,所以算法的內(nèi)存代價(jià)如式(5)所示.
基于Spark/MapReduce的大數(shù)據(jù)分析常用的連接算法中,Simi-Join及其變種性能較差,適用性較差,僅在特殊的條件下才使用,目前在Spark上沒有其系統(tǒng)實(shí)現(xiàn),因此本文不做與Simi-Join的對(duì)比分析.而Broadcast Join也僅適用于維表很小的情況下,在Spark SQL等基于Spark的大數(shù)據(jù)分析系統(tǒng)中并不常用.假設(shè)事實(shí)表Fact中的元組數(shù)為n,每個(gè)數(shù)據(jù)塊中存儲(chǔ)的元組數(shù)為bf,維表Dim中的元組數(shù)為m,每個(gè)數(shù)據(jù)塊中存儲(chǔ)的元組數(shù)為bd,其中bf和bd是常數(shù).則有:
(1)Broadcast Join:的每個(gè)節(jié)點(diǎn)的平均網(wǎng)絡(luò)通信量為Size(Dim),遠(yuǎn)大于本文算法;計(jì)所需內(nèi)存空間等于維表的數(shù)據(jù)量乘以事實(shí)表的數(shù)據(jù)塊數(shù),即其中bs為事實(shí)表數(shù)據(jù)塊大小,通常為64 MB整數(shù)倍.
(2)Repartition Join:在查詢執(zhí)行時(shí)需要將維表和事實(shí)表的數(shù)據(jù)進(jìn)行劃分,所以每個(gè)節(jié)計(jì)算復(fù)雜度同樣為O(n);所需內(nèi)存空間為Size(Fact)+Size(Dim).
本文算法與Broadcast Join、Repartition Join的對(duì)比如表2所示.
表2 等值連接算法代價(jià)對(duì)比Tab.2 Cost comparison of equi-join algorithms
可見,網(wǎng)絡(luò)I/O代價(jià)方面,由于N為參與連接的節(jié)點(diǎn)數(shù),通常與集群規(guī)模有關(guān),是一個(gè)較大的正整數(shù),而0<α+β?1,所以本文算法網(wǎng)絡(luò)I/O代價(jià)遠(yuǎn)低于Broadcast Join和Repartition Join;計(jì)算復(fù)雜度方面,本文算法和Repartition Join為同一數(shù)量級(jí),低于Broadcast Join;內(nèi)存空間代價(jià)方面,由于為事實(shí)表數(shù)據(jù)塊數(shù),通常是一個(gè)較大的正整數(shù),所以本文算法的內(nèi)存空間代價(jià)遠(yuǎn)低于Broadcast Join和Repartition Join.
本文在Spark上實(shí)現(xiàn)了所述的連接算法,并以TPC-DS[11-12]作為測(cè)試基準(zhǔn)來(lái)測(cè)試連接算法的性能,與Spark SQL和Shark進(jìn)行了對(duì)比.
實(shí)驗(yàn)的軟件環(huán)境如表3所示.
表3 軟件環(huán)境Tab.3 Software environment
本文的實(shí)驗(yàn)在實(shí)驗(yàn)室的云平臺(tái)上完成,所用虛擬機(jī)集群的配置如表4所示.
表4 集群配置Tab.4 Cluster settings
所用的測(cè)試數(shù)據(jù)為TPC-DS 100GB和300GB數(shù)據(jù)集中最大的一張事實(shí)表Store_Sales和最大的一張維表Customer,存儲(chǔ)在HDFS上.測(cè)試所用的兩張表的數(shù)據(jù)量占數(shù)據(jù)集總數(shù)據(jù)量的40%左右,如表5所示.Store_Sales中的外鍵ss_customer_sk與Customer中的主鍵c_customer_sk可以連接,連接時(shí)每個(gè)表除聯(lián)接屬性外,各使用了6個(gè)32位整型列.
表5 測(cè)試表的數(shù)據(jù)量Tab.5 Data volume of testing table
定義2 Fact與Dim連接選擇率:Dim中能與Fact產(chǎn)生連接結(jié)果的元組數(shù)占Dim總元組數(shù)的比率.
在實(shí)驗(yàn)中,取連接選擇率為70%,采用TPC-DS 300GB數(shù)據(jù)集,集群節(jié)點(diǎn)數(shù)為16,每個(gè)節(jié)點(diǎn)給Spark分配3個(gè)核,測(cè)試連接并行度Pd對(duì)連接執(zhí)行時(shí)間的影響,結(jié)果如圖1所示.
圖1 連接并行度對(duì)連接執(zhí)行時(shí)間的影響Fig.1 Impact of parallelism degree on join execution time
可見當(dāng)Pd取分配給Spark集群的CPU核數(shù)的2倍,即96時(shí)可以得到最好的連接性能,并且Pd在該值周圍時(shí),查詢執(zhí)行時(shí)間變化不大.
在實(shí)驗(yàn)中,取Pd為分配給Spark集群的CPU核數(shù)的2倍.通過人為加入隨機(jī)因數(shù)改變兩表的連接選擇率來(lái)驗(yàn)證在不同的連接選擇率下連接算法的性能.實(shí)驗(yàn)結(jié)果如圖2所示.
圖2 連接執(zhí)行時(shí)間對(duì)比Fig.2 Join execution time comparison
圖2中的執(zhí)行時(shí)間是5次執(zhí)行取得的平均值,單位為s,保留小數(shù)點(diǎn)后兩位.根據(jù)查詢計(jì)劃分析,Spark SQL中使用的是Repartition Join算法,而Shark中使用的是Broadcast Join算法.實(shí)驗(yàn)中測(cè)試了不同連接選擇率下的連接操作執(zhí)行時(shí)間,Shark的執(zhí)行時(shí)間全部超過1 800s或者報(bào)錯(cuò),因此執(zhí)行時(shí)間沒有標(biāo)注在圖中.Spark SQL使用的Repartition Join對(duì)連接選擇率不敏感.本文中的連接算法隨著連接選擇率的下降,執(zhí)行時(shí)間有所下降,并趨向一個(gè)穩(wěn)定值,這個(gè)穩(wěn)定值是算法執(zhí)行過程中預(yù)連接的執(zhí)行時(shí)間.在最壞情況下,即連接選擇率為100%時(shí),本文連接算法的執(zhí)行速度仍然比目前性能最好的Hash Join高出1倍.
通過圖2(a)和(b)的對(duì)比,當(dāng)集群中節(jié)點(diǎn)數(shù)量增加1倍(8個(gè)增加到16個(gè)),數(shù)據(jù)量增加2倍(288M行到864M行),擴(kuò)展性最理想情況下執(zhí)行時(shí)間增加50%,本文連接算法的平均執(zhí)行時(shí)間(5種連接選擇率下執(zhí)行時(shí)間的平均值)增加63.7%,而Spark SQL的Repartition Join平均執(zhí)行時(shí)間增加63.1%,可見本文的算法和Repartition Join都具有較好的可擴(kuò)展性.
隨著Spark等大規(guī)模集群式的內(nèi)存計(jì)算框架在大數(shù)據(jù)分析中的普及,交互式的大數(shù)據(jù)分析成為必然的趨勢(shì).連接性能是交互式大數(shù)據(jù)分析的主要瓶頸.本文對(duì)Spark/Hadoop上現(xiàn)有的等值連接算法進(jìn)行了分析研究,提出了一種改進(jìn)的等值連接算法.該算法具有很好的適用性,在事實(shí)表和大維表的連接中表現(xiàn)出良好的性能,比現(xiàn)有系統(tǒng)中的連接算法性能高出1~2倍.并且隨著連接選擇率的降低,算法的性能會(huì)進(jìn)一步提高.
本文的算法可以作為等值連接的操作符加入到現(xiàn)有的Spark SQL等數(shù)據(jù)分析系統(tǒng)中,提高現(xiàn)有數(shù)據(jù)分析系統(tǒng)的性能.
[1] ZAHARIA M,CHOWDHURY M,F(xiàn)RANKLIN M J,et al.Spark:cluster computing with working sets[C]//HotCloud2010.USENIX Association Berkeley,CA:[s.n.],2010:10-10.
[2] Spark[OL].http://spark.apache.org/.
[3] Shark[OL].http://shark.cs.berkeley.edu/.
[4] Spark SQL[OL].http://spark.apache.org/sql/.
[5] BLANAS S,PATEL J M,ERCEGOVAC V,et al.A comparison of join algorithms for log processing in MaPreduce[C]//SIGMOD2010.New York:ACM,2010:975-986.
[6] SAKR S,ANNALIU,F(xiàn)AYOUMI A G.The Family of MapReduce and Large-Scale Data Processing Systems[J].ACM Computing Surveys(CSUR),2013,46(1).
[7] KARGER D,LEHMAN E,LEIGHTON T,et al.Consistent hashing and random trees:distributed caching protocols for relieving hot spots on the world wide Web[C]//STOC97.New York:ACM,1997:654-663.
[8] DECANDIA G,HASTORUN D,JAMPANI M,et al.Dynamo:Amazon’s highly available key-value Store[C]//SOSP2007.New York:ACM,2007:205-220.
[9] XIN R S,ROSEN J,ZAHARIA M,et al.Shark:SQL and rich analytics at scale[C]//SIGMOD2013.New York:ACM,2013:13-24.
[10] THUSOO A,SARMA J S,JAIN N,et al.Hive:a warehousing solution over a map-reduce framework[J].PVLDB,2009,2(2):1626-1629.
[11] OTHAYOTH R,POESS M.The making of TPC-DS[C]//VLDB2006.New York:ACM ,2006:1049-1058.
[12] POESS M,NAMBIAR R O,WALRATH D.Why you should run TPC-DS:a workload analysis[C]//VLDB2007.New York:ACM ,2007:1138-1149.
華東師范大學(xué)學(xué)報(bào)(自然科學(xué)版)2014年5期