趙麗梅 黃小菊 宮學(xué)慶
(華東師范大學(xué)軟件工程學(xué)院 上海 200062)
Apache Spark[1]是一個(gè)通用的并行查詢引擎,能夠支持對(duì)鍵值數(shù)據(jù)源的數(shù)據(jù)分析處理,擴(kuò)展鍵值數(shù)據(jù)庫[2-3]大規(guī)模復(fù)雜查詢分析的能力,例如對(duì)鍵值數(shù)據(jù)源的Join查詢。實(shí)際應(yīng)用場景中常搭建的Spark-over-HBase架構(gòu)[4],利用Spark查詢引擎支持HBase數(shù)據(jù)源[5-6]的復(fù)雜查詢,其存儲(chǔ)層利用HBase集群對(duì)海量數(shù)據(jù)進(jìn)行持久化存儲(chǔ),計(jì)算層采用Spark查詢引擎來執(zhí)行大規(guī)模查詢分析。該架構(gòu)實(shí)現(xiàn)存儲(chǔ)層與計(jì)算層分離,解決了HBase數(shù)據(jù)庫僅能通過鍵來進(jìn)行簡單查詢的不足,擴(kuò)展了其支持復(fù)雜查詢的能力。如圖1所示,該架構(gòu)底層采用HBase集群來進(jìn)行數(shù)據(jù)持久化存儲(chǔ),HBase集群包括一個(gè)HMaster主節(jié)點(diǎn)和若干個(gè)HRegionServer節(jié)點(diǎn),利用Zookeeper集群來進(jìn)行分布式協(xié)調(diào)。數(shù)據(jù)分布存儲(chǔ)在多個(gè)HRegionServer節(jié)點(diǎn),通過主節(jié)點(diǎn)來協(xié)調(diào)各HRegionServer的負(fù)載均衡并維護(hù)集群的狀態(tài);上層利用Spark集群來實(shí)現(xiàn)對(duì)于數(shù)據(jù)的并行計(jì)算和查詢響應(yīng),Spark集群包括一個(gè)Master主節(jié)點(diǎn)和若干個(gè)Worker節(jié)點(diǎn),通過主節(jié)點(diǎn)來管理Worker節(jié)點(diǎn),用戶提交應(yīng)用程序啟動(dòng)Driver進(jìn)程來觸發(fā)集群工作。當(dāng)接收到一個(gè)SQL請求后,啟動(dòng)Driver進(jìn)程,Spark查詢引擎生成對(duì)應(yīng)的任務(wù)調(diào)度,通過數(shù)據(jù)訪問接口提取HBase中的數(shù)據(jù)到各Worker節(jié)點(diǎn)進(jìn)行數(shù)據(jù)并行處理,響應(yīng)查詢請求。
圖1 Spark-over-HBase架構(gòu)
Spark-over-HBase架構(gòu)擴(kuò)展了HBase數(shù)據(jù)庫的大規(guī)模查詢分析能力,但是對(duì)于復(fù)雜查詢分析中較常用、開銷較大的Join操作,Spark查詢引擎的處理過程仍然有以下兩方面不足:
(1) 網(wǎng)絡(luò)傳輸開銷大。根據(jù)文獻(xiàn)[7]可知,因?yàn)榇鎯?chǔ)模式簡單,所以大多數(shù)的鍵值數(shù)據(jù)庫不具備在存儲(chǔ)層進(jìn)行謂詞下推和投影來過濾數(shù)據(jù),掃描數(shù)據(jù)效率低下。所以,大多情況下Spark查詢引擎需要提取全表數(shù)據(jù)進(jìn)行處理。對(duì)于Join操作,Spark底層實(shí)現(xiàn)了Broadcast Hash Join算法、Repartition Join算法[8]。針對(duì)兩個(gè)大表進(jìn)行Join的Repartition Join算法,是將兩表的全表數(shù)據(jù)進(jìn)行Shuffle操作[9]。Spark當(dāng)前采用的Shuffle策略是Sort Based Shuffle,這涉及大量的磁盤I/O開銷和計(jì)算開銷,尤其是網(wǎng)絡(luò)傳輸開銷。對(duì)于寬表或者網(wǎng)絡(luò)通信效率低下的情況,該部分開銷占比更大。
(2) 并行度設(shè)置不合理。在硬件資源滿足條件的情況下,并行處理大數(shù)據(jù)量往往能夠極大地提高工作性能。在Spark執(zhí)行Join查詢時(shí),Join執(zhí)行的并行度與進(jìn)行Shuffle操作時(shí)重分區(qū)的個(gè)數(shù)有關(guān),即(1)中Reduce任務(wù)的個(gè)數(shù)。Spark提供spark.sql.shuffle.partitions參數(shù)來設(shè)定Shuffle時(shí)分區(qū)的個(gè)數(shù)。一般情況下,各Join實(shí)現(xiàn)算法取該參數(shù)的默認(rèn)值。同時(shí),用戶也可根據(jù)生產(chǎn)環(huán)境配置手動(dòng)調(diào)整該參數(shù),但是手動(dòng)調(diào)整人工代價(jià)大且難以做出正確估計(jì),很難給出最佳參數(shù)配置。
如何減少網(wǎng)絡(luò)傳輸代價(jià)、合理設(shè)置Join的并行度,提升現(xiàn)有的Spark-over-HBase平臺(tái)下的大表Join性能對(duì)于大規(guī)模數(shù)據(jù)查詢分析意義重大。
基于此,本文重點(diǎn)從優(yōu)化算法流程以及動(dòng)態(tài)設(shè)定并行度兩方面來提升Join操作性能,具體的貢獻(xiàn)如下:
(1) 借鑒Semi Join算法[11]的思想,首先提取左表Join列數(shù)據(jù)構(gòu)建HashMap,利用該HashMap對(duì)右表進(jìn)行過濾,過濾掉右表Join列數(shù)據(jù)不在HashMap中的元組,即不符合連接條件的元組。通過該預(yù)處理流程,可以減少右表參與Shuffle操作的數(shù)據(jù)量和進(jìn)行Join匹配的數(shù)據(jù)量,降低網(wǎng)絡(luò)傳輸開銷和相關(guān)的CPU開銷。
(2) 考慮集群的CPU核數(shù)配置來動(dòng)態(tài)設(shè)置Join操作的并行度,盡可能充分利用集群資源,提升Join操作效率。
基于以上的優(yōu)化方法,本文進(jìn)行了理論分析和對(duì)比實(shí)驗(yàn)驗(yàn)證。實(shí)驗(yàn)結(jié)果顯示,對(duì)于兩張表Join列數(shù)據(jù)不完全匹配的情況,右表與左表匹配數(shù)據(jù)量越少,本文所提方案優(yōu)化效果越明顯。
Join操作是大規(guī)模查詢分析中最常見且開銷最大的操作之一,在Spark查詢引擎中利用Broadcast Hash Join算法、Shuffle Hash Join算法和Sort Merge Join算法來實(shí)現(xiàn)。Broadcast Hash Join算法局限性大,主要適用于兩表數(shù)據(jù)量相差極大,且小表數(shù)據(jù)量小于規(guī)定閾值的場景,本文不予討論。后兩種算法均屬于Repartition Join,主要是在進(jìn)行Join之前對(duì)兩表數(shù)據(jù)進(jìn)行Shuffle操作。Repartition Join主要處理參與Join的兩個(gè)表數(shù)據(jù)量都很大的場景,通過Shuffle操作實(shí)現(xiàn)兩表分區(qū)數(shù)據(jù)有效匹配,但是Shuffle操作的磁盤I/O開銷、網(wǎng)絡(luò)通信開銷和內(nèi)存處理開銷很大。
目前,針對(duì)各類查詢引擎進(jìn)行大表Join操作的查詢優(yōu)化已經(jīng)有了很多的研究。
文獻(xiàn)[12]提出了基于Hadoop框架的大數(shù)據(jù)集的Join優(yōu)化算法,算法主要利用Hadoop的分布式緩沖機(jī)制來優(yōu)化MapReduce框架的Reduce Side Join。對(duì)于參與Join操作的兩個(gè)表,算法先提取出其中一個(gè)表的連接屬性,然后利用Bit-map數(shù)據(jù)結(jié)構(gòu)壓縮成小數(shù)據(jù)文件存入磁盤中,通過Hadoop的分布式緩存機(jī)制將小數(shù)據(jù)文件傳輸?shù)礁鱾€(gè)分布式節(jié)點(diǎn)。然后,在Map階段,利用讀取到的小數(shù)據(jù)文件對(duì)另一個(gè)表的數(shù)據(jù)進(jìn)行過濾,過濾掉不在該小數(shù)據(jù)文件中的元組,即不滿足Join連接條件的元組。最后,在Reduce階段將兩表連接屬性值相同的元組執(zhí)行Join操作。該優(yōu)化算法利用其中一個(gè)表的連接屬性對(duì)另一個(gè)表的數(shù)據(jù)進(jìn)行預(yù)過濾,可以減少Shuffle階段的數(shù)據(jù)量,降低網(wǎng)絡(luò)傳輸開銷。但是該算法利用Bit-map數(shù)據(jù)結(jié)構(gòu)進(jìn)行壓縮,對(duì)另一個(gè)表數(shù)據(jù)過濾時(shí)存在一定的誤判率,對(duì)數(shù)據(jù)的過濾性不好。而且,該算法需要利用Hadoop的分布式緩存機(jī)制將小數(shù)據(jù)文件存入磁盤,額外增加了I/O開銷,影響了最終的Join優(yōu)化效果。
文獻(xiàn)[13]提出了一種基于Bloom Filter數(shù)據(jù)結(jié)構(gòu)的Spark大表等值連接的優(yōu)化算法。該算法首先對(duì)兩張數(shù)據(jù)表抽取連接屬性并進(jìn)行去重,然后利用Bloom Filter數(shù)據(jù)結(jié)構(gòu)對(duì)去重后的連接屬性分別進(jìn)行壓縮得到兩個(gè)位數(shù)組,對(duì)兩個(gè)位數(shù)組進(jìn)行“與”運(yùn)算,生成BF位數(shù)組。利用這個(gè)BF位數(shù)組再分別對(duì)兩張表進(jìn)行過濾,即過濾掉不符合連接條件的記錄。最后,對(duì)過濾后的兩張表執(zhí)行Hash Join算法,得到連接結(jié)果。該優(yōu)化算法利用Bloom Filter數(shù)據(jù)結(jié)構(gòu),同樣是過濾掉兩個(gè)表中不符合連接條件的元組,減少Shuffle操作的數(shù)據(jù)量。但是對(duì)兩張表的連接屬性進(jìn)行去重時(shí)涉及Shuffle操作,且隨著連接屬性值的增多,該部分開銷隨之增加。而且Bloom Filter的數(shù)據(jù)結(jié)構(gòu)壓縮效率沒有Bit-map數(shù)據(jù)結(jié)構(gòu)好,且為了降低誤判率,位數(shù)組的長度還需適當(dāng)增加,如何選定合適的誤判率以及對(duì)應(yīng)的位數(shù)組大小仍需進(jìn)行優(yōu)化。
文獻(xiàn)[14]在Spark平臺(tái)上針對(duì)大維表的等值連接提出了優(yōu)化算法。算法主要包括以下幾步:(1) 對(duì)事實(shí)表Fact的連接屬性值Key進(jìn)行去重,得到無重集FactUK,F(xiàn)actUK中元組不僅包括Key鍵,也包括其在Fact表中的存儲(chǔ)位置。(2) 將FactUK與維表Dim進(jìn)行預(yù)連接,執(zhí)行Partition Join。其中,重分區(qū)的個(gè)數(shù)按照Fact和Dim的大小進(jìn)行動(dòng)態(tài)設(shè)定,并利用一致性哈希算法來進(jìn)行重分區(qū),避免了因數(shù)據(jù)傾斜產(chǎn)生的連接負(fù)載不均的問題,然后在各個(gè)分區(qū)上對(duì)FactUK和Dim按照Key進(jìn)行cogroup分組并過濾掉不能匹配上的Key。(3) 將預(yù)連接結(jié)果按照Fact的分區(qū)號(hào)進(jìn)行重分區(qū),在各個(gè)分區(qū)將預(yù)連接結(jié)果與Fact表通過zipPartition操作進(jìn)行組裝,返回完整的連接結(jié)果。該優(yōu)化算法主要在于結(jié)合了Partition Join與Semi Join的優(yōu)勢,對(duì)兩表數(shù)據(jù)進(jìn)行重分區(qū)和預(yù)連接,減少了對(duì)于事實(shí)表全表數(shù)據(jù)進(jìn)行重分區(qū)的Shuffle開銷,同時(shí)也優(yōu)化了連接執(zhí)行的并行度、采用一致性哈希來進(jìn)行數(shù)據(jù)分區(qū),以此獲得更好的連接性能。但是該算法也增加了對(duì)事實(shí)表連接屬性值去重的Shuffle開銷,并且該算法假設(shè)事實(shí)表和維度表的數(shù)據(jù)可以完全緩存到內(nèi)存中,而在實(shí)際生產(chǎn)環(huán)境中,很難將所有數(shù)據(jù)完全緩存到內(nèi)存中。
本方案中各分區(qū)的連接算法仍采用Sort Merge Join,主要利用Semi Join思想對(duì)參與Join的兩表數(shù)據(jù)進(jìn)行預(yù)處理,并且動(dòng)態(tài)設(shè)定Join操作的并行度,以獲得更好的優(yōu)化效果。假設(shè)參與Join的兩個(gè)表分別為R表、S表,連接條件為R.A=S.B,其中A為R表key鍵,B為S表列族中的對(duì)應(yīng)列。定義該優(yōu)化方案為Semi Sort Merge Join,其對(duì)應(yīng)的Join執(zhí)行流程如下所示。
基于Semi Join算法進(jìn)行數(shù)據(jù)預(yù)處理的流程如下:
(1) 提取R表Join列。對(duì)R表中的元組進(jìn)行投影,只保留Join列的信息,結(jié)果定義為joinSet數(shù)據(jù)集。因?yàn)镠Base數(shù)據(jù)庫中Key鍵的唯一性,所以該Join列數(shù)據(jù)沒有重復(fù)值。并且只提取單列數(shù)據(jù),數(shù)據(jù)量少。
(2) 構(gòu)建HashMap。對(duì)(1)中的joinSet數(shù)據(jù)集構(gòu)建HashMap。為了盡可能減少HashMap的內(nèi)存開銷,設(shè)定HashMap的Key鍵為Join列值,value值統(tǒng)一設(shè)定為null。
(3) 過濾S表數(shù)據(jù)。利用(2)中HashMap的Key鍵匹配S表的Join列,對(duì)S表數(shù)據(jù)進(jìn)行過濾,剔除掉S表中B列數(shù)值不包含在HashMap中的元組。
利用Semi Join算法的思想對(duì)數(shù)據(jù)進(jìn)行預(yù)處理,在執(zhí)行Join操作前,在內(nèi)存中構(gòu)建R表Join列數(shù)據(jù)的HashMap,利用該HashMap對(duì)S表數(shù)據(jù)進(jìn)行精確過濾,過濾掉S表數(shù)據(jù)的Join列在R表中沒有相關(guān)匹配值的元組,可以減少后續(xù)操作中對(duì)S表的無用數(shù)據(jù)進(jìn)行Shuffle操作的磁盤I/O開銷、網(wǎng)絡(luò)開銷和相關(guān)的CPU開銷,也減少了后續(xù)參與Sort Merge Join的數(shù)據(jù)量。
對(duì)于Repartition Join的實(shí)現(xiàn),Shuffle操作就是按照設(shè)定的重分區(qū)的個(gè)數(shù)對(duì)R表和S表數(shù)據(jù)按照J(rèn)oin列數(shù)據(jù)的Hash值進(jìn)行重分區(qū)。在Spark SQL中,Shuffle操作重分區(qū)的個(gè)數(shù)主要由參數(shù)spark.sql.shuffle.partitions決定,默認(rèn)值是200。因?yàn)橹胤謪^(qū)的個(gè)數(shù)直接關(guān)系到Join操作執(zhí)行的并行度,所以合理設(shè)置重分區(qū)的個(gè)數(shù)尤為重要。如果該數(shù)值設(shè)定過小,會(huì)導(dǎo)致集群處理性能低且資源利用不合理,未發(fā)揮集群優(yōu)勢;如果設(shè)定過大,則網(wǎng)絡(luò)連接超負(fù)荷、任務(wù)調(diào)度開銷大,也不利于提升集群的處理性能。
所以在本文方案中,考慮利用動(dòng)態(tài)優(yōu)化的思想,在執(zhí)行過程中根據(jù)集群資源來動(dòng)態(tài)設(shè)定重分區(qū)的個(gè)數(shù),以此來優(yōu)化Join操作的并行度。定義重分區(qū)個(gè)數(shù)為Partitionnum,其計(jì)算公式如下:
Partitionnum=w×corenum
(1)
式中:corenum表示集群中executor實(shí)例總的CPU核數(shù)。因?yàn)榧旱挠?jì)算能力受制于集群中CPU核數(shù)的個(gè)數(shù)[15],因此用w×corenum表示集群資源的限制。如果分區(qū)個(gè)數(shù)等于corenum,即Join操作的并行度等于corenum,w=1,則可能某些運(yùn)行較快的任務(wù)較早運(yùn)行完,空閑出相應(yīng)的CPU核;如果設(shè)定w過大,則可能任務(wù)調(diào)度過于頻繁,開銷過大。本文設(shè)置多組實(shí)驗(yàn),設(shè)定不同的重分區(qū)個(gè)數(shù),測試得出w的最優(yōu)值為2。
綜上所述,優(yōu)化后該步驟進(jìn)行Shuffle操作的具體流程如下所示:(1) 通過配置—num-executors、—executor-cores分別獲取Spark集群中每個(gè)節(jié)點(diǎn)上executor的實(shí)例數(shù)和每個(gè)executor所分配的CPU核數(shù),則集群中executor實(shí)例總的CPU核數(shù)為兩者的乘積;(2) 按照式(1)設(shè)定spark.sql.shuffle.partitions參數(shù);(3) 利用Spark的Sort Based Shuffle策略對(duì)兩表數(shù)據(jù)進(jìn)行重分區(qū)。
通過Shuffle操作,將兩表的數(shù)據(jù)分為具有相同個(gè)數(shù)的多個(gè)分區(qū),然后對(duì)兩表具有相同分區(qū)號(hào)的分區(qū)數(shù)據(jù)進(jìn)行合并,執(zhí)行Sort Merge Join操作,主要流程如下:(1) 在各分區(qū)上,對(duì)兩表數(shù)據(jù)按照相同排序規(guī)則進(jìn)行排序;(2) 分別順序遍歷兩表數(shù)據(jù),按照J(rèn)oin連接條件進(jìn)行匹配。
綜上所述,該優(yōu)化方案的整體執(zhí)行計(jì)劃如圖2所示。(1) 利用TableScan獲取R表數(shù)據(jù),通過Map映射Join列數(shù)據(jù),并利用CollectAsMap算子構(gòu)建Join列的HashMap數(shù)據(jù)結(jié)構(gòu);(2) 利用TableScan獲取S表數(shù)據(jù),通過Map映射好各屬性后,利用R表Join列數(shù)據(jù)的HashMap對(duì)S表數(shù)據(jù)進(jìn)行過濾,過濾掉S表Join列數(shù)據(jù)不在HashMap中的元組;(3) 通過Map映射R表的各屬性;(4) 按照應(yīng)用配置,計(jì)算集群中executor實(shí)例總的CPU核數(shù),并根據(jù)Join列的Hash值以及2×corenum對(duì)R表和S表數(shù)據(jù)進(jìn)行重分區(qū);(5) 在對(duì)應(yīng)分區(qū)上分別對(duì)兩表數(shù)據(jù)進(jìn)行Sort Merge Join。
圖2 Semi Sort Merge Join算法執(zhí)行計(jì)劃
Spark并行處理框架是基于分布式共享內(nèi)存進(jìn)行計(jì)算處理的,即在任務(wù)執(zhí)行過程中,數(shù)據(jù)是緩存在內(nèi)存中進(jìn)行計(jì)算處理的,必要情況下需要將中間結(jié)果存入磁盤,例如Shuffle操作。為了簡化分析思路,本節(jié)方案分析不考慮將中間結(jié)果存入磁盤的情況,同時(shí)假設(shè)內(nèi)存充足,所有中間結(jié)果可以有效緩存在內(nèi)存中并進(jìn)行內(nèi)存計(jì)算。
本節(jié)的方案分析利用代價(jià)模型[16-17]進(jìn)行代價(jià)估算。代價(jià)估算基于第2節(jié)中的例子,定義代價(jià)估計(jì)中各參數(shù)及參數(shù)意義如表1所示。
表1 代價(jià)模型中各參數(shù)及意義
為了方便分析,定義Filterability(過濾性)表示通過R表的A列對(duì)S表進(jìn)行過濾后,S表過濾掉的元組數(shù)占原表元組數(shù)的比例,其計(jì)算公式如下:
(2)
對(duì)于分布式數(shù)據(jù)處理,主要考慮I/O代價(jià)、網(wǎng)絡(luò)傳輸代價(jià)和CPU的計(jì)算代價(jià)這三方面,總的代價(jià)估計(jì)如式(3)所示。
異丙托溴銨聯(lián)合布地奈德混懸液霧化吸入治療上呼吸道感染后慢性咳嗽的效果……………………………… 陳衍秋 陳英俊(3)335
Costtotal=CostI/O+CostComm+CostCPU
(3)
I/O代價(jià)主要考慮讀取參與Join的表所耗費(fèi)的時(shí)間,因?yàn)閮?yōu)化前后均是對(duì)數(shù)據(jù)進(jìn)行全表掃描,所以該部分代價(jià)不進(jìn)行對(duì)比分析。在分布式環(huán)境下,網(wǎng)絡(luò)傳輸開銷占比較大,相對(duì)而言CPU的計(jì)算開銷占比很小,所以本節(jié)重點(diǎn)分析優(yōu)化方案的網(wǎng)絡(luò)傳輸代價(jià)。優(yōu)化方案的網(wǎng)絡(luò)傳輸代價(jià)主要包括對(duì)R表Join列構(gòu)建的HashMap的網(wǎng)絡(luò)傳輸時(shí)間和Shuffle過程中進(jìn)行數(shù)據(jù)重分區(qū)所耗費(fèi)的時(shí)間,該部分的代價(jià)主要與網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)量有關(guān),數(shù)據(jù)量越大,網(wǎng)絡(luò)傳輸代價(jià)越大。具體如式(4)所示。
Costcomm=(sizeof(H)×|H|+
max(sizenR,sizenfilterS))×comm
(4)
本文方案主要是在Spark實(shí)現(xiàn)的Sort Merge Join算法基礎(chǔ)上進(jìn)行改進(jìn)的,在各分區(qū)上仍舊采用Sort Merge Join算法執(zhí)行Join操作,所以本節(jié)主要對(duì)比Semi Sort Merge Join算法與Sort Merge Join算法的網(wǎng)絡(luò)通信代價(jià),對(duì)比情況如下:
Sort Merge Join算法未對(duì)數(shù)據(jù)進(jìn)行過濾,通過TableScan后直接將兩表數(shù)據(jù)進(jìn)行Shuffle操作,涉及的網(wǎng)絡(luò)傳輸代價(jià)如式(5)所示。
Costcomm=max(sizenR,sizenS)×comm
(5)
在Semi Sort Merge Join算法中,如果對(duì)R表Join列投影后行數(shù)和寬度都相對(duì)較小,而右表S表是寬表且行數(shù)較多,則sizeof(H)×|H|的網(wǎng)絡(luò)傳輸代價(jià)可以忽略不計(jì)。此時(shí),過濾性對(duì)網(wǎng)絡(luò)傳輸代價(jià)影響較大,F(xiàn)T越大,其過濾數(shù)據(jù)量越多,網(wǎng)絡(luò)傳輸數(shù)據(jù)量越少,對(duì)應(yīng)網(wǎng)絡(luò)傳輸代價(jià)越小。相較于文獻(xiàn)[12-13],如果誤判率較高,對(duì)S表數(shù)據(jù)的過濾性不好,不符合連接條件的數(shù)據(jù)產(chǎn)生的網(wǎng)絡(luò)傳輸量就比較大。但是,如果對(duì)R表Join列投影后行數(shù)和寬度都相對(duì)較大的情況下,sizeof(H)×|H|的網(wǎng)絡(luò)傳輸代價(jià)不容忽視。例如R表和S表全表匹配的情況下,生成HashMap以及將其進(jìn)行網(wǎng)絡(luò)傳輸?shù)拈_銷會(huì)降低Semi Sort Merge Join的執(zhí)行效率。
綜上所述,對(duì)于R表和S表數(shù)據(jù)不完全匹配的情況下,Semi Sort Merge Join算法的性能提升主要在于通過R表的Join列構(gòu)建HashMap后,過濾掉S表不符合連接條件的元組,從而減少Shuffle操作的數(shù)據(jù)量,減少Shuffle write和Shuffle read的開銷。
本實(shí)驗(yàn)使用三臺(tái)服務(wù)器搭建HBase集群和Spark集群,服務(wù)器的硬件配置以及相關(guān)的軟件版本如表2所示。
表2 相關(guān)配置及軟件版本
本文的實(shí)驗(yàn)均使用TPC-H Benchmark數(shù)據(jù)集[18]。TPC基準(zhǔn)是被全球數(shù)據(jù)庫廠商公認(rèn)的性能評(píng)價(jià)標(biāo)準(zhǔn),其中的TPC-H測試基準(zhǔn)是一組決策支持基準(zhǔn),可測試系統(tǒng)執(zhí)行復(fù)雜、高并發(fā)查詢的能力。
TPC-H數(shù)據(jù)集總共包括8張表,根據(jù)表之間的關(guān)聯(lián)性,選擇orders表(訂單表)和lineitem表(訂單明細(xì)表)來進(jìn)行Join查詢性能的實(shí)驗(yàn)。實(shí)驗(yàn)數(shù)據(jù)表使用TPC-H提供的數(shù)據(jù)生成器生成數(shù)據(jù),數(shù)據(jù)大小由比例系數(shù)SF決定的,根據(jù)SF的不同大小生成不同數(shù)量的測試數(shù)據(jù)集。
本節(jié)實(shí)驗(yàn)主要通過兩表Join查詢來測試Spark查詢引擎提供的Sort Merge Join算法的查詢處理性能以及本文方案的性能。實(shí)驗(yàn)主要利用orders表和lineitem表進(jìn)行等值連接,連接條件為orders.O_ORDERKEY=lineitem.L_ORDERKEY。
本文設(shè)置的每組實(shí)驗(yàn)均執(zhí)行5次,實(shí)驗(yàn)結(jié)果取平均值。
本節(jié)主要測試優(yōu)化方案中利用Semi Join算法進(jìn)行數(shù)據(jù)預(yù)處理對(duì)Sort Merge Join算法的提升效果。實(shí)驗(yàn)對(duì)應(yīng)的lineitem表的行數(shù)為1 200萬,為了測試兩表之間匹配程度對(duì)算法的影響,設(shè)置orders表的數(shù)據(jù)量分別為1 000、10萬、100萬、150萬、300萬,查詢對(duì)應(yīng)lineitem表中有效匹配元組數(shù)分別為4 000、40萬、400萬、600萬、1 200萬。
實(shí)驗(yàn)的結(jié)果如圖3所示。其中橫坐標(biāo)表示orders表的行數(shù),縱坐標(biāo)表示查詢執(zhí)行時(shí)間,單位為秒。對(duì)比兩個(gè)算法的執(zhí)行時(shí)間,當(dāng)orders表行數(shù)小于100萬時(shí),可以明顯看到經(jīng)過Semi Join預(yù)處理后Join執(zhí)行時(shí)間更短。而且,隨著orders表行數(shù)的減少,對(duì)應(yīng)linetime表中有效匹配元組數(shù)逐漸減少,過濾掉的數(shù)據(jù)量逐漸增多,通過Semi Join算法預(yù)處理后對(duì)于Spark的Sort Merge Join算法的提升效果更加明顯。但是,隨著orders表數(shù)據(jù)量逐漸增加,對(duì)應(yīng)linetime表中有效匹配元組數(shù)也在增加,過濾掉的數(shù)據(jù)量也逐漸減少,利用Semi Join進(jìn)行預(yù)處理反而不利于Sort Merge Join的執(zhí)行,主要是受提取orders表的Join列數(shù)據(jù)構(gòu)建HashMap的內(nèi)存和網(wǎng)絡(luò)傳輸開銷的影響。
圖3 Sort Merge Join算法經(jīng)過Semi Join預(yù)處理與未經(jīng)Semi Join算法預(yù)處理響應(yīng)時(shí)間對(duì)比
所以,Join之前利用Semi Join算法進(jìn)行數(shù)據(jù)預(yù)處理主要適用于左表與右表Join列值不完全匹配的場景。右表與左表Join列值匹配數(shù)越少,過濾掉的右表數(shù)據(jù)量越多,則參與Shuffle操作以及Sort Merge Join的數(shù)據(jù)量越少,對(duì)Join操作的性能提升越有幫助,提升效果越顯著。
由4.4節(jié)可知,對(duì)于左表與右表Join列值不完全匹配的場景,經(jīng)過Semi Join預(yù)處理后,Sort Merge Join算法的提升效率顯著。在此基礎(chǔ)上,進(jìn)一步測試Join操作的并行度對(duì)Sort Merge Join算法的提升效果,即對(duì)比本文所定義的Semi Sort Merge Join算法的完整優(yōu)化方法與Sort Merge Join算法的性能。
在本節(jié)實(shí)驗(yàn)中,選用100萬行的orders表和1 200萬行的lineitem表進(jìn)行實(shí)驗(yàn)。在搭建的Spark集群中,集群Worker節(jié)點(diǎn)數(shù)為2,每個(gè)節(jié)點(diǎn)上分配2個(gè)executor,對(duì)每個(gè)executor分配2個(gè)核,則集群中的總核數(shù)為8。在實(shí)驗(yàn)設(shè)計(jì)中,調(diào)整參數(shù)spark.sql.shuffle.partitions來設(shè)定shuffle時(shí)不同的重分區(qū)個(gè)數(shù),從而影響到Join操作的并行度,以此來測試Join操作的并行度對(duì)執(zhí)行時(shí)間的影響,結(jié)果如圖4所示。
圖4 Join操作并行度對(duì)查詢時(shí)間的影響
可以看出,當(dāng)設(shè)定重分區(qū)個(gè)數(shù)為16,即Join操作的并行度為16時(shí),Semi Sort Merge Join算法執(zhí)行時(shí)間最短。所以,當(dāng)設(shè)定重分區(qū)個(gè)數(shù)為Spark集群的CPU核數(shù)的2倍時(shí),Join執(zhí)行性能最優(yōu)。因此,對(duì)于2.2節(jié)中的式(1),設(shè)定w系數(shù)為2。
4.4節(jié)中僅測試經(jīng)過Semi Join預(yù)處理后對(duì)于Sort Merge Join的提升效率,未測試Join操作的并行度優(yōu)化對(duì)算法的影響。在100萬行的orders表與1 200萬行的lineitem表進(jìn)行Join操作的場景中,未經(jīng)過Semi Join預(yù)處理的執(zhí)行時(shí)間為266 s,經(jīng)過Semi Join預(yù)處理的執(zhí)行時(shí)間為249 s,優(yōu)化后算法性能提升了6.39%。在本節(jié)中進(jìn)一步設(shè)置好最佳并行度后,Semi Sort Merge Join算法的執(zhí)行時(shí)間為222 s,相較于未經(jīng)過Semi Join預(yù)處理的Sort Merge Join算法性能提升了16.54%。可見,通過動(dòng)態(tài)設(shè)定連接并行度對(duì)Join操作的查詢也有很大的幫助。
Spark支持大規(guī)模數(shù)據(jù)處理,對(duì)任務(wù)進(jìn)行分布式并行執(zhí)行。但其涉及開銷較大的Join操作,一直是大數(shù)據(jù)查詢分析的瓶頸。本文對(duì)Spark現(xiàn)有的大表Join實(shí)現(xiàn)算法進(jìn)行了研究,發(fā)現(xiàn)其未考慮兩表Join列數(shù)據(jù)匹配關(guān)系對(duì)Shuffle操作的影響。因此,本文基于Semi Join,根據(jù)兩表Join列之間的匹配關(guān)系,提出了一種改進(jìn)的Join實(shí)現(xiàn)算法。該算法利用左表Join列數(shù)據(jù)所構(gòu)建的HashMap對(duì)右表數(shù)據(jù)進(jìn)行過濾,主要適用于兩表Join列數(shù)據(jù)不完全匹配的情況,且右表與左表匹配數(shù)據(jù)量越少,該算法優(yōu)化效果越明顯。