陸 可 桂 偉 江雨燕 杜萍萍
(安徽工業(yè)大學管理科學與工程學院 安徽 馬鞍山 243000)
基于Spark的并行FP-Growth算法優(yōu)化與實現
陸 可 桂 偉 江雨燕 杜萍萍
(安徽工業(yè)大學管理科學與工程學院 安徽 馬鞍山 243000)
頻繁模式挖掘作為模式識別的重要問題,一直受到研究者的廣泛關注。FP-Growth算法因其高效快速的特點,被大量應用于頻繁模式的挖掘任務中。然而,該算法依賴于內存運行的特性,使其難以適應大規(guī)模數據計算。針對上述問題,圍繞大規(guī)模數據集下頻繁模式挖掘展開研究,基于Spark框架,通過對支持度計數和分組過程的優(yōu)化改進了FP-Growth算法,并實現了算法的分布式計算和計算資源的動態(tài)分配。運算過程中產生的中間結果均保存在內存中,因此有效減少數據的I/O消耗,提高算法的運行效率。實驗結果表明,經優(yōu)化后的算法在面向大規(guī)模數據時要優(yōu)于傳統(tǒng)的FP-Growth算法。
頻繁模式挖掘 FP-Growth算法 分布式計算 Spark框架
R.Agrawal提出Apriori算法用于解決關聯規(guī)則的挖掘問題后[1],頻繁模式的挖掘就一直受到廣泛關注。之后,Han提出的FP-Growth算法由于其自身在計算效率上的優(yōu)勢[2],得到了更為廣泛的運用。然而,隨著數據規(guī)模的不斷增大,該算法在實際運用中也存在一些尚待改進的問題。一方面,FP-Growth算法在統(tǒng)計各元素項的支持度計數過程中會消耗大量時間;另一方面,大規(guī)模數據集的維度跨越過大,導致算法構建的頻繁模式樹(FP-Tree)難以存入內存,影響了算法的運行。因此,大規(guī)模數據集上如何有效挖掘關聯規(guī)則成為近年來研究人員關注的重點。
分布式計算是解決大規(guī)模數據環(huán)境下數據挖掘任務的常用方法,也是一種通過使用低成本的多個硬件達到高性能計算機性能的一種技術。其中,段孝國等在文獻[3]中介紹了中間件技術、P2P技術、移動Agent技術、網格技術、云計算和Web Service這幾種分布式運算技術的關鍵技術和國內外應用現狀。胡敏等在文獻[4]中對上述幾種分布式運算技術進行了分析和比較。隨著眾多學者對分布式運算技術的進一步了解和學習,已有學者將分布式運算技術應用于數據挖掘中。文獻[5]中,王軼等提出了一種計算框架并將其應用于分布和并行環(huán)境的數據挖掘中,最終建立一種可以應用于分布式并行數據挖掘的框架。王小妮等[6]提出了基于云計算的分布式數據挖掘平臺架構,并在此基礎上提出了一種新的分布式數據挖掘模式。當前,關于分布式計算的研究集中在分布式系統(tǒng)和分布式環(huán)境研究兩方面[4]。其中,基于分布式計算環(huán)境和計算框架實現數據挖掘應用引起了學者們的廣泛關注。Zeng等[7]詳細介紹了分布式計算環(huán)境的方法和應用程序的最新技術,并對分布式計算環(huán)境領域進行了總結。Moteria等[8]將分布式計算環(huán)境同高效可擴展的分布式算法相結合并運用于頻繁模式的挖掘中,取得了較好的效果。Song等[9]在KNN算法的基礎上,結合谷歌提出的Map/Reduce編程模型為大數據的背景下解決基于KNN的實際問題提供了一個指南。Chen等[10]針對分布式數據挖掘中廣泛存在著的候選集冗余和頻繁模式挖掘低效率的問題,提出了基于Hadoop分布式計算框架的并行算法,在分布式節(jié)點中構建和挖掘TPT-Tree,提高了頻繁模式數據挖掘的效率。近年來,隨著Hadoop分布式計算框架的廣泛運用,有學者基于該框架和Map/Reduce編程模型實現了FP-Growth算法的并行化改進[11-13]。文獻[14]在Hadoop框架基礎上提出了一種基于布爾矩陣和Map/Reduce編程模型的FP-Growth算法BPFP,其執(zhí)行效率和速率均優(yōu)于原始算法。上述基于Hadoop平臺的方法在一定程度上提高了算法在挖掘大規(guī)模數據關聯規(guī)則的性能。但在Hadoop框架下,Map/Reduce編程模型各步驟的中間結果均存儲到硬盤中。這種方法可能在處理大規(guī)模數據時會存在著因頻繁讀取硬盤而造成處理時間大量增加的問題。而Spark[15-16]是典型的基于RDD彈性分布內存的分布式框架。相較于Hadoop,Spark所有的中間結果均存儲在內存中,故省去了大量的硬盤I/O操作。薛志云[17]同時搭建了Hadoop和Spark平臺,對Hadoop和Spark的性能進行比較,同一組數據集在兩個平臺上進行Kmeans聚類的時間對比,實驗結果表明Spark相較于Hadoop更適用于需大量迭代的機器學習算法,且伴隨著數據集的增大Spark的優(yōu)勢愈明顯。
故在分而治之的思想基礎上,運用Spark分布式框架,利用其自身基于內存進行計算的特點,提高了FP-Growth算法的迭代效率。此外,還對算法的支持度計數統(tǒng)計和數據分組過程進行了優(yōu)化,提高了算法的運行效率和準確率。
FP-Growth算法使用特殊的數據結構存儲數據集,算法執(zhí)行過程中不產生候選集,而是采用一種頻繁項集不斷增長的方式進行挖掘。邏輯上算法分為兩個步驟:第一步將數據集以FP樹的特殊數據結構存儲;第二步就是遞歸地挖掘FP樹[18]。為構建FP樹,需對原始數據集掃描兩遍,對所有元素項的出現次數進行計數。(1) 第一次遍歷數據集,獲得每個元素項的出現頻率,找出滿足最小支持度計數的元素項。同時,將所有元素按照支持度計數降序排列。得到了有一項的頻繁項集,記為頭指針。(2) 第二次遍歷數據集,刪除每條事務中不滿足最小支持度計數的項,同時,利用頭指針中各元素項的排列順序,對各條事務中的單個元素項進行降序排序。(3) 完成上述操作后,從null開始,不斷添加過濾、排序后的事務,如果樹中已有現有元素,則增加現有元素的值(即經過該路徑),如果不存在,則向樹添加分枝,直到所有事務均添加到樹中為止。首先,我們得到一組事務數據集,其對應事務ID及事務中的元素項見表1。
表1 事務數據集
最終通過上述三個步驟經過對事務數據集的反復進行基于最小支持度計數的篩選和排序后,得到構建完成的FP樹,如圖1所示。
圖1 已構建的FP樹
近年來,部分學者發(fā)現,Spark分布式框架較Hadoop更為適用于這一類遞歸數據挖掘算法。文獻[19]在Spark框架上實現了Apriori算法的并行化改進,取得了較好的效果。
Spark是針對Map/Reduce在處理迭代式算法時效率較低問題時提出的新的內存計算框架,在保留Map/Reduce相關特性的基礎上,Spark基于內存的集群方式較Map/Reduce運行速率快100倍[20]。更為重要的是Spark由于其基于內存的特性,能夠大量部署在廉價的機器上進而形成一個大規(guī)模集群[21]。在實際應用中,Spark框架具有較強的靈活性。
FP-Growth算法作為迭代式算法,在實現并行化的過程中其最大的特點是在多個并行操作中重用數據。這一類算法都用一個函數對同一數據集進行反復的計算,Spark不同于Map/Reduce的是其中間輸出結果均保存在內存中,這樣就避免了對HDFS進行頻繁的讀/寫操作。因此,Spark更加適用于FP-Growth這一類需要迭代的數據挖掘算法。
2.1 基于Spark框架的并行算法優(yōu)化
在Spark上的并行化實現主要分為3個步驟:(1) 計算所有項的支持度計數,這里的思路類似于WordCount過程。WordCount是最基礎也是最能體現Map/Reduce編程模型的程序之一,其最終輸出的結果為鍵值對的形式,和FP-Growth算法要求的鍵值對形式相同。首先,main函數對Map/Reduce Job進行初始化,再遍歷每一個事務,累計單個項的支持度計數,在通過reduceByKey算子將待輸出數據與一個共用的key結合,最終運用算子中的lamdba函數將所有單個元素項的鍵值對合成一個結果,保存在內存中。(2) 數據分組,將F_list中的條目分成G個組,就形成了一個Group_list,這其中每一個Group都包含一組item的集合。在這一步中,mapper完成的主要功能是數據集分區(qū),逐個處理數據分區(qū)中的事務。將事務分為item,每個item根據Group_list映射到合適的group中去,然后在reduce中并行執(zhí)行FP-Growth算法。(3) 結果聚合將所有機器上的處理結果聚合,聚合各臺機器上得到的頻繁項集,并統(tǒng)計支持度。其主要過程如圖2所示。其中,原始數據和最終輸出結果均保存在HDFS中。
圖2 算法實現
下面以偽代碼的形式給出在Spark框架下優(yōu)化后的并行化算法的實現。如算法1所示。
算法1基于Spark框架的并行優(yōu)化算法
Input:事務數據集D的HDFS鏈接:HDFS://Hadoop1:9000/ min_sup
Output:頻繁項集L(結果以文本的形式存儲在HDFS中)
step1: var sc =new SparkContext(conf)
step2: var file = sc.textFile(arg(0)) {
step3: for {Item /*每個事務的項*/
step4: compute(sup);}
step5: var D_list=item.flatMap /*事務分組*/
step6: FP_Growth(D_list) /*并行執(zhí)行優(yōu)化后的FP_Growth算法*/
step7: def decare(result){ /*笛卡爾積*/
step8: return L} /*輸出最終結果,并保存在HDFS中*/
2.2 支持度計數統(tǒng)計及分組過程優(yōu)化
本文首先優(yōu)化了支持度計數統(tǒng)計過程。此外,基于分布式框架優(yōu)化FP-Growth算法較重要的一個環(huán)節(jié)是需考慮好數據分組的問題。在并行化運行過程中,整個算法的運行時間取決于最后一個組完成任務的時間,所以對于每一個組的計算量應盡量相等。在Spark框架中,分組的數目決定了整個框架的計算粒度。整個并行計算過程中,每一階段的分組數目和任務數目均保持一致。一方面要盡量保證每一個子計算區(qū)的分組數相等,另一方面要盡量保證每一個分組的計算量保持平衡。在算法的支持度計數統(tǒng)計過程中,加入了reduceByKey算子。當采用reduceByKey時,可以在每個分區(qū)移動數據之前將待輸出數據與一個共用的key結合。同時運用算子中的lamdba函數將所有單個元素項的鍵值對合成一個結果。
下面給出了支持度計數統(tǒng)計過程中的偽代碼,如算法2所示。
算法2支持度計數統(tǒng)計優(yōu)化算法偽代碼
Input:事務數據集f
Output:各個項的支持度計數sup
/*compute(支持度)之前將重復行合并,并刪除事務編號*/
step1: var f = file.map
step2: .drop()
//執(zhí)行刪除操作;去掉多余的事務編號
step3: .toList()
step4: .reduceBykey()
//合并相同的行
/*compute(支持度),并通過cache()存儲在內存中*/
step5: compute(sup)
step6: .reduceBykey()
//在分組操作移動前,將相同項支持度結果合并
step7: .cache()
//所得全部支持度計數結果存儲在內存中
在考慮分組問題時,首先,對每個Spark分區(qū)的分組數進行了定義,可根據需要調整分組數,從而限定了每個子分區(qū)的分組數都保持相等。通過加入groupByKey算子,對相同元素項的鍵值對進行移動一方面減少了分組的時間損耗。另一方面保證了相似的元素移動到同一子分區(qū)中,保證了各個子分區(qū)的計算量盡量保證接近。
同樣,我們給出了分組優(yōu)化過程的偽代碼,如算法3所示。
算法3分組優(yōu)化過程偽代碼
Input:f_list
Output: group
//分完組的事務數據數據集
step1: var pum =()
//人工設定一個數目來控制分組規(guī)模
step2: var g_size = (g_count + pum -1)
//人工控制分組規(guī)模
step3: var f_list = item.flatMap(t=> {
step4: var pre = -1;i= t.1.length -1
step5: var result = List[(Int, (List[Int],Int))]()
step6: while (i >= 0) {
step7: //執(zhí)行循環(huán),判定item中的數據是否類似
step8: End for(i=-1)
//當事務數組的長度為0時,分組過程結束
/* g_size是分組的個數,item即數據集中事務以及其出現次數的鍵值對*/
/*將item中的數據分為g_size個組,所有分組中的均為相關的數據*/
/*每個組中都包含著一組item*/
step9: .groupBykey()
//對相同的項進行移動
step10: .cache()
//存儲在內存中
3.1 實驗環(huán)境設置
為驗證優(yōu)化后的并行化FP_Growth算法的有效性,在私有云平臺創(chuàng)建三臺四核、6 GB內存的服務器。其中,每臺服務器擁有100 GB存儲,系統(tǒng)為Centos 6.5。
整個Spark框架采用主從式分布式集群,集群中包括三個節(jié)點,其中,Hadoop1為主節(jié)點(Master)、Hadoop2、Hadoop3為從節(jié)點(slave)。服務器中的jdk版本為1.7.0_79,Hadoop版本為2.2.0,Spark版本為1.1.0。算法的實現語言為scala 2.10.4。
3.2 實驗框架搭建
集群1包括三個節(jié)點,各個節(jié)點之間設置免密碼SSH訪問,各個節(jié)點IP地址和主機名稱如表2。其中,表中內存為Spark集群每個worker的運行內存,可通過修改Spark配置文件在硬件內存范圍內自行調節(jié)。保證了良好的擴展性。
表2 集群1網絡配置
3.3 實驗數據及結果
配置四臺云服務器,為保證單機系統(tǒng)和集群的硬件參數保持一致,其中一臺服務器作為單機系統(tǒng),配置為四核、6 GB內存。利用剩下的三臺服務器搭建Spark集群。集群中含有三個節(jié)點,每個節(jié)點的worker內存設為2 GB,保證了總內存為6 GB。在單機系統(tǒng)上運行FP-Growth算法,Spark集群中運行優(yōu)化后的并行FP-Growth算法。在實驗中使用了機器學習領域中常用的四個數據集,分別為mushroom.dat、pumsbstar.dat、accidents.dat、webdocs.dat。這四個數據集的詳細說明如表3所示。
表3 數據集說明
最后將兩組實驗的運行時間進行比較。實驗結果如表4所示。表4中是單機系統(tǒng)下的FP-Growth算法運行時間,是優(yōu)化后的并行FP-Growth算法在Spark集群下的運行時間。為了保證實驗數據的可靠性,每個數據集均運行三次,取三次運行時間的平均值作為最終的運行時間。
表4 實驗結果對比
實驗初始階段,在處理較小規(guī)模數據集的情況下,由于Spark集群在啟動和加載過程中消耗了部分時間,其運行時間與單機系統(tǒng)相差不大。但是,隨著數據規(guī)模的不斷增加,Spark集群充分發(fā)揮了其自身優(yōu)勢,在處理大規(guī)模數據時,處理性能已經遠遠優(yōu)于單機系統(tǒng)。其次,通過優(yōu)化FP-Growth算法的支持度計數統(tǒng)計及分組過程,運行時間已經大大縮減。通過對比實驗,Spark集群在處理相同大規(guī)模數據集時,其運行速率已遠遠超越單機情況下的結果。
實驗中,本文還考慮到了Spark集群的一大獨有優(yōu)勢,即通過增加/刪除節(jié)點迅速地調整集群的規(guī)模以適應不同需求的計算。因此,在保證Spark集群worker總內存6 GB不變的前提下,調整從節(jié)點的數目和主從節(jié)點的worker內存,使用優(yōu)化后的并行FP-Growth算法來處理accidents.dat等數據集。其中,先設置一組含有一個節(jié)點的Spark集群,worker內存設為6 GB,運行時間記為,待第一組集群實驗完畢后,再設置一組含有兩個節(jié)點的Spark集群,worker內存分別設為3 GB,總內存保證為6 GB,運行時間記為。設置一組含有三個節(jié)點的Spark集群,worker內存分別設為2 GB,總內存保證為6 GB,運行時間即為上述表4中的實驗結果,記為T3。實驗結果如表5所示。
表5 集群實驗結果
通過表5的實驗結果,可以看到,隨著節(jié)點個數的不斷增加,整個Spark集群的處理效率更高,運行同一數據集的時間在逐步遞減。當數據集規(guī)模不斷增大時,這種遞減的幅度逐漸增大、愈發(fā)明顯。同樣,在其他數據集上也得到了類似的結果。
綜上,經過優(yōu)化后的并行FP-Growth算法執(zhí)行效率更高,且整個Spark集群的可擴展性好,能夠適應各種不同的計算任務。
本文基于Spark分布式框架實現了現有FP-Growth算法的并行化,并優(yōu)化了算法的支持度計數統(tǒng)計和數據分組過程。通過設置對比實驗,比較了單機系統(tǒng)下運行FP-Growth與Spark集群下運行優(yōu)化后的并行FP-Growth算法的速率。同時,從集群擴展性的角度設置了另一組實驗。結果表明,Spark集群具有較好的擴展性,可以適應各種不同的計算任務。且在Spark集群上運行優(yōu)化后的并行FP-Growth算法具有很高的性能。
基于本文開展的相關工作,發(fā)現在處理一些大規(guī)模數據時,Spark的RDD數據區(qū)內存參數設置可能會影響到算法的運行速率。因此,下一步將考慮內存這一重要參數對于Spark集群性能的影響機制。
[1] Agrawal R,Imieliński T,Swami A.Mining association rules between sets of items in large databases[C]//Acm sigmod record.ACM,1993,22(2):207-216.
[2] Han J,Pei J,Yin Y,et al.Mining frequent patterns without candidate generation:A frequent-pattern tree approach[J].Data mining and knowledge discovery,2004,8(1):53-87.
[3] 段孝國.分布式計算技術介紹[J].電腦知識與技術,2011,7(22):5463-5465.
[4] 胡敏,付琍.對幾種典型分布式計算技術的比較[J].電腦知識與技術,2010,6(5):1244-1246.
[5] 王軼,達新宇.分布式并行數據挖掘計算框架及其算法研究[J].微電子學與計算機,2006,23(9):223-225.
[6] 王小妮,高學東,倪曉明.基于云計算的分布式數據挖掘平臺架構[J].北京信息科技大學學報(自然科學版),2011,26(5):19-24.
[7] Zeng L,Xu L,Shi Z,et al.Distributed computing environment:Approaches and applications[C]//IEEE International Conference on Systems.IEEE,2007:3240-3244.
[8] Moteria P M,Ghodasara Y R.Novel Most Frequent Pattern Mining Approach,Using Distributed Computing Environment[J].International Journal of Engineering Research & Technology,2013,2(2):1-3.
[9] Song G,Rochas J,Beze L,et al.K Nearest Neighbour Joins for Big Data on MapReduce:a Theoretical and Experimental Analysis[J].IEEE Transactions on Knowledge & Data Engineering,2016,28(9):2376-2392.
[10] Bo C,Yong D C,Xiue G.A frequent pattern parallel mining algorithm based on distributed sliding window[J].Computer Systems Science and Engineering,2016,31(2):101-107.
[11] 呂雪驥,李龍澍.FP-Growth算法MapReduce化研究[J].計算機技術與發(fā)展,2012,22(11):123-126.
[12] 楊勇,王偉.一種基于MapReduce的并行FP-Growth算法[J].重慶郵電大學學報(自然科學版),2013,25(5):651-657,670.
[13] 施亮,錢雪忠.基于Hadoop的并行FP-Growth算法的研究與實現[J].微電子學與計算機,2015,32(4):150-154.
[14] Xingshu C,Shuai Z,Tao T.FP-Growth Algorithm Based on Boolean Matrix and MapReduce[J].Journal of South China University of Technology,2014,42(1):135-141.
[15] Sankar K,Karau H.Fast Data Processing with Spark[M].Packt Publishing Ltd,2015.
[16] Zaharia M,Chowdhury M,Das T,et al.Fast and interactive analytics over Hadoop data with Spark[J].USENIX Login,2012,37(4):45-51.
[17] 薛志云,何軍,張丹陽,等.Hadoop和Spark在實驗室中部署與性能評估[J].實驗室研究與探索,2015,34(11):77-81.
[18] Harrington P.機器學習實戰(zhàn)[M].北京:人民郵電出版社,2013.
[19] Qiu H,Gu R,Yuan C,et al.Yafim:a parallel frequent itemset mining algorithm with Spark[C]//Parallel & Distributed Processing Symposium Workshops (IPDPSW),2014 IEEE International.IEEE,2014:1664-1671.
[20] 黎文陽.大數據處理模型Apache Spark研究[J].現代計算機,2015,8(13):55-60.
[21] 高彥杰.Spark大數據處理:技術、應用與性能優(yōu)化[M].北京:機械工業(yè)出版社,2014.
OPTIMIZATIONANDIMPLEMENTATIONOFPARALLELFP-GROWTHALGORITHMBASEDONSPARK
Lu Ke Gui Wei Jiang Yuyan Du Pingping
(SchoolofManagementScienceandIndustrialEngineering,AnhuiUniversityofTechnology,Maanshan243000,Anhui,China)
As an important problem of pattern recognition, frequent itemsets mining has been paid more and more attention by researchers. FP-Growth algorithm is widely used in frequent pattern mining because of its high efficiency and fast performance. However, the algorithm relies on the characteristics of local memory operation, making it difficult to adapt to large-scale data calculation. To solve these problems, this paper focuses on the research of frequent itemsets mining in a distributed environment. The FP-Growth algorithm which based on the Spark framework was improved by optimizing the support count and grouping process. Furthermore, the distributed computation and the dynamic allocation of computing resources were realized. The intermediate results were stored in the memory, so the I/O consumption was reduced and the efficiency of the algorithm was improved. The experimental results show that the improved distributed FP-Growth algorithm is superior to the traditional FP-Growth algorithm for large-scale data.
Frequent pattern mining FP-Growth algorithm Distributed computing Spark framework
TP3
A
10.3969/j.issn.1000-386x.2017.09.053
2016-11-09。國家自然科學基金項目(71371013);安徽工業(yè)大學校青年教師科研基金項目(QZ201420);安徽省教育廳自然科學基金項目(KJ2016A087)。陸可,講師,主研領域:數據挖掘與機器學習。桂偉,碩士生。江雨燕,教授。杜萍萍,碩士生。