黃承寧
(南京工業(yè)大學(xué)浦江學(xué)院,江蘇 南京 211222)
“圖計(jì)算”是以“圖論”為基礎(chǔ)的對(duì)現(xiàn)實(shí)世界的一種“圖”結(jié)構(gòu)的抽象表達(dá),以及在這種數(shù)據(jù)結(jié)構(gòu)上的數(shù)據(jù)計(jì)算模式。圖數(shù)據(jù)結(jié)構(gòu)很好地表達(dá)了數(shù)據(jù)之間的關(guān)聯(lián)性,而關(guān)聯(lián)性計(jì)算是大數(shù)據(jù)計(jì)算的核心—通過獲得數(shù)據(jù)的關(guān)聯(lián)性,可以從很多的海量數(shù)據(jù)中抽取有用的信息[1]。隨著Web2.0、社交網(wǎng)絡(luò)、移動(dòng)互聯(lián)網(wǎng)或者物聯(lián)網(wǎng)的發(fā)展,數(shù)據(jù)量呈現(xiàn)幾何級(jí)數(shù)增加,數(shù)據(jù)結(jié)構(gòu)將更隨意,對(duì)于信息的抽取分析也將更困難。因此,數(shù)據(jù)的增長(zhǎng)速度也將超過計(jì)算機(jī)數(shù)據(jù)處理的增長(zhǎng)速度,這便產(chǎn)生了大數(shù)據(jù)問題。為了解決在大數(shù)據(jù)環(huán)境下從海量數(shù)據(jù)中抽象出有用信息的問題,許多新型基于圖數(shù)據(jù)結(jié)構(gòu)的計(jì)算平臺(tái)與引擎相繼提出。
Google為了應(yīng)對(duì)圖計(jì)算的需求,推出了基于圖計(jì)算的“計(jì)算框架”—Pregel,為大規(guī)模數(shù)據(jù)處理而設(shè)計(jì)的快速通用的計(jì)算引擎Spark也有支持圖計(jì)算機(jī)器學(xué)習(xí)的模塊—GraphX,機(jī)器學(xué)習(xí)框架GraphLab中底層框架圖計(jì)算PowerGraph[2-3]。這些分布式圖計(jì)算系統(tǒng)通過集群強(qiáng)大的計(jì)算資源來處理大規(guī)模的圖數(shù)據(jù),但是同步開銷和容錯(cuò)開銷使得分布式圖計(jì)算系統(tǒng)存在瓶頸,之后GraphChi與X-Stream系統(tǒng)提供了解決方案,但是這些單機(jī)圖計(jì)算系統(tǒng)在耗時(shí)性能與磁盤帶寬利用率上表現(xiàn)不佳。GridGraph圖計(jì)算系統(tǒng)將頂點(diǎn)分割成一維的chunks,再根據(jù)邊的源頂點(diǎn)和目的頂點(diǎn)分割成二維的邊blocks。GridGraph通過聚合更新遍歷blocks一次完成最優(yōu)迭代全局計(jì)算,無縫地?cái)U(kuò)展內(nèi)存容量和磁盤帶寬,使得GridGraph超過其他圖計(jì)算系統(tǒng)引擎一個(gè)數(shù)量級(jí),成為負(fù)載均衡、性能優(yōu)異的圖計(jì)算機(jī)系統(tǒng)。
在海量圖計(jì)算中經(jīng)常面臨迭代計(jì)算,即在計(jì)算中圖頂點(diǎn)會(huì)收到上一個(gè)循環(huán)頂點(diǎn)的信息,并且將這些信息匯總再次傳遞給下一個(gè)循環(huán)節(jié)點(diǎn),在上下循環(huán)中更新節(jié)點(diǎn)信息。當(dāng)前主流的圖計(jì)算模式或者框架基本上都遵循整體BSP(bulk synchronous parallel)同步并行計(jì)算模式[4]。在此算法和框架中,每個(gè)節(jié)點(diǎn)會(huì)分布計(jì)算和匯總各個(gè)節(jié)點(diǎn)信息,區(qū)別在于不同框架中,各個(gè)節(jié)點(diǎn)之間存在不同的通信方式??傮w框架中的一個(gè)算法由一系列SuperStep組成,如圖1所示。
圖1 BSP框架處理流程
BSP整個(gè)框架由一系列超步(SuperStep)組成,在這一系列步驟中,每個(gè)步驟類似節(jié)點(diǎn),進(jìn)行分布式計(jì)算與匯總。每個(gè)SuperStep大致可以歸納為三個(gè)環(huán)節(jié):局部計(jì)算環(huán)節(jié),相關(guān)資源只對(duì)本地的數(shù)據(jù)進(jìn)行計(jì)算匯總;全局計(jì)算環(huán)節(jié),相關(guān)資源只對(duì)本地之外的數(shù)據(jù)進(jìn)行計(jì)算匯總;收尾計(jì)算環(huán)節(jié),系統(tǒng)等待相關(guān)資源計(jì)算結(jié)束進(jìn)行回收。具體如圖2所示。
圖2 超步三個(gè)環(huán)節(jié)
從抽象高層次看BSP是“計(jì)算-通信-同步”的模式[5-6], 該模型包括三個(gè)模塊:一組具有局部?jī)?nèi)存的分布式處理器,負(fù)責(zé)本地集群中每個(gè)節(jié)點(diǎn)的并行計(jì)算機(jī)進(jìn)程;基于選路器(Router)的全局通信網(wǎng)絡(luò),用于負(fù)責(zé)在BSP模型中對(duì)圖節(jié)點(diǎn)訪問的不同Processor之間的消息數(shù)據(jù)傳遞,以便實(shí)現(xiàn)同步;支持所有處理單元間全局路障同步的機(jī)制,負(fù)責(zé)協(xié)調(diào)每個(gè)路障器的執(zhí)行時(shí)序時(shí)間。即如上所述,每一個(gè)SuperStep計(jì)算過程由局部計(jì)算環(huán)節(jié)、全局計(jì)算環(huán)節(jié)和收尾計(jì)算環(huán)節(jié)組成。每一個(gè)并行完成,意味著將由本SuperStep進(jìn)入下一個(gè)SuperStep。在BSP模式中最重要的是bsp方法,該方法程序代碼片段如下:
public voidbsp(BSPPeer
int in=0;
for(int i=0;i double x=2.0*Math.random()-1.0; double y=2.0*Math.random()-1.0; if((Math.sqrt(x*x+y*y)<1.0)){ in++; } } double data=4.0*in/iterations; peer.send(masterTask,new DoubleWritable(data)); peer.sync(); } 其中for循環(huán)體代碼完成本地分布式處理計(jì)算,peer調(diào)用send()方法完成全局不同的Processor之間的消息數(shù)據(jù)通信,peer調(diào)用send()方法實(shí)現(xiàn)柵欄同步。 并行處理圖計(jì)算系統(tǒng)將圖數(shù)據(jù)全部加載到集群中的內(nèi)存進(jìn)行計(jì)算,理論上隨著集群規(guī)模的增大其計(jì)算性能和內(nèi)存容量隨之線性增大,能處理的圖數(shù)據(jù)也隨之線性增大。由于圖計(jì)算理論基礎(chǔ)BSP框架算法中存在全局通信,因此并行分布式計(jì)算模式中圖分割與聚類就受到了集群網(wǎng)絡(luò)總帶寬的限制,整體性能和所能處理的圖規(guī)模也存在一定的缺陷[7-9]。這類圖計(jì)算系統(tǒng)主要包括同步計(jì)算模型的Pregel及同時(shí)支持同步和異步的系統(tǒng)GraphX和PowerGraph。 Pregel提出了“像頂點(diǎn)一樣思考”(Think Like A Vertex)的圖計(jì)算模式,讓用戶無需考慮并行分布式計(jì)算的細(xì)節(jié),只需要實(shí)現(xiàn)一個(gè)頂點(diǎn)更新函數(shù),讓框架在遍歷頂點(diǎn)時(shí)進(jìn)行調(diào)用即可[10]。Pregel分布式計(jì)算全過程可以描述為讀取數(shù)據(jù)、初始化數(shù)據(jù)計(jì)算、開展計(jì)算和收尾計(jì)算等步驟。 讀取輸入初始化Pregel中的圖 val originalValue=value val value=(message:+ value).min if(originalValue==value) inactive() else neighbours.foreach(sendMessage) 運(yùn)行超級(jí)步,迭代直到計(jì)算結(jié)束 var i=0 while(activeMessages>0 && i val newVerts=g.vertices.innerJoin(messages)(vprog).cache() g=g.outerJoinVertices(newVerts){(vid,old,newOpt).newOpt.getOrElse(old) }.cache() messages=g.mapReduceTriplets(sendMsg,mergeMsg,Some((newVerts, activeDir))).cache() activeMessages=messages.count() i+=1 } 從核心代碼可以看出,Pregel分布式圖計(jì)算框架中每一個(gè)節(jié)點(diǎn)需要發(fā)送大量消息到鄰居節(jié)點(diǎn),同步執(zhí)行容易產(chǎn)生滯后阻塞,從而使其在密率圖的處理中性能欠佳。 PowerGraph將數(shù)據(jù)抽象成Graph結(jié)構(gòu),將算法的執(zhí)行過程抽象成Gather、Apply和Scatter三個(gè)步驟。其并行的核心思想是對(duì)頂點(diǎn)的切分。同一臺(tái)機(jī)器(同一節(jié)點(diǎn))上的所有edge(邊)和vertex(頂點(diǎn))構(gòu)成Local Graph,在每臺(tái)機(jī)器上,存在本地id到全局id的映射表[11-12]。 PageRank在PowerGraph中的公式: R[i]=0.15+ΣwijR[j] PowerGraph圖計(jì)算模式: Gather(j->i):return wji*R[j] sum(a,b):return a+b; Apply(i,Σ):R[i]=0.15+Σ Scatter(i->j):if R[i] changed then trigger j to be recomp 從三個(gè)操作步驟代碼可以看出,PowerGraph共享狀態(tài)異步執(zhí)行將需要大量鎖,且每個(gè)節(jié)點(diǎn)的處理都觸碰到了圖的大部分。對(duì)于單個(gè)節(jié)點(diǎn)來說,元數(shù)據(jù)太大,這種邊分割并行策略對(duì)于處理整個(gè)圖數(shù)據(jù)依然開銷太大。 GraphX是一個(gè)新的(alpha)Spark API,用于圖和并行圖(graph-parallel)的計(jì)算,在Spark之上提供一棧式數(shù)據(jù)解決方案。GraphX描述的是擁有頂點(diǎn)屬性和邊屬性的有向圖。GraphX提供頂點(diǎn)(Vertex)、邊(Edge)、邊三元組(EdgeTriplet)三種視圖。GraphX的各種圖操作也是在這三種視圖上完成的。GraphX將圖數(shù)據(jù)以RDD分布式地存儲(chǔ)在集群的節(jié)點(diǎn)上,使用頂點(diǎn)RDD(VertexRDD)、邊RDD(EdgeRDD)存儲(chǔ)頂點(diǎn)集合和邊集合[13]。 圖構(gòu)造: object GraphLoader { def edgeListFile( sc:SparkContext, path:String, canonicalOrientation:Boolean=false, minEdgePartitions:Int=1) :Graph[Int,Int] } 圖算法: val graph=GraphLoader.edgeListFile(sc,"graphx/data/followers.txt") val ranks=graph.pageRank(0.0001).vertices val users=sc.textFile("graphx/data/users.txt").map { line =>val fields=line.split(",") (fields(0).toLong, fields(1)) } val ranksByUsername=users.join(ranks).map { case (id,(username,rank))=>(username,rank) } println(ranksByUsername.collect().mkString("
")) 從算法看出GraphX是通過調(diào)用GraphLoader.edgeListFile()函數(shù),從邊文件中讀入的。由于邊文件中只存儲(chǔ)了相應(yīng)的頂點(diǎn)編號(hào),沒有頂點(diǎn)對(duì)應(yīng)的屬性。因此需要使用user(VertexId,attr)將頂點(diǎn)信息補(bǔ)全,通信開銷和運(yùn)行時(shí)間都比之前兩者要低。 以上分布式圖計(jì)算模式的算法過程雖然不復(fù)雜,但在實(shí)現(xiàn)過程中,特別在處理海量圖數(shù)據(jù)時(shí)性能瓶頸變得突出,分布式的優(yōu)點(diǎn)在于網(wǎng)絡(luò)內(nèi)的節(jié)點(diǎn)并行計(jì)算處理,但是在分割與聚合圖過程中分布式開銷太多,因此對(duì)分布式算法進(jìn)行兩點(diǎn)優(yōu)化:減少分割聚合重計(jì)算過程中的內(nèi)存長(zhǎng)時(shí)間利用;減少分布式中的網(wǎng)絡(luò)開銷。 GridGraph是一個(gè)用于在單個(gè)機(jī)器上處理大規(guī)模圖數(shù)據(jù)的系統(tǒng)。GridGraph使用預(yù)處理中的第一種細(xì)粒度級(jí)分區(qū)將圖分成一維分區(qū)的頂點(diǎn)塊chunk和二維分區(qū)的邊塊block。在運(yùn)行時(shí)應(yīng)用第二粗粒度級(jí)分區(qū)。通過一種新穎的雙滑動(dòng)窗口方法,GridGraph可以流化邊并應(yīng)用即時(shí)頂點(diǎn)更新,從而減少計(jì)算所需的I/O量。邊的分割還使得能夠進(jìn)行選擇性調(diào)度,可以跳過一些塊以減少不必要的I/O。 GridGraph將頂點(diǎn)分割成P個(gè)均等的chunks。每個(gè)chunk內(nèi)的頂點(diǎn)連續(xù)排列。全部的P×P個(gè)blocks可以看出一個(gè)網(wǎng)格,每個(gè)邊按照如下規(guī)則被放入對(duì)應(yīng)的block中:源頂點(diǎn)決定所在block的行,目的頂點(diǎn)決定所在block的列。圖4展示了GridGraph如何將圖3分割的例子。圖3中有4個(gè)頂點(diǎn),在例子中選擇P=2。{1,2}和{3,4}是兩個(gè)頂點(diǎn)chunk。例如邊(3,2)被分割到Block(2,1),因?yàn)轫旤c(diǎn)3屬于Chunk 2,頂點(diǎn)1屬于Chunk 1。 圖3 圖 例 圖4 圖分割 GridGraph只花費(fèi)很短的時(shí)間就能完成完整的圖分割預(yù)處理,并且同一個(gè)圖生成的網(wǎng)格能很好地用于所有的算法。通過分割,GridGraph能進(jìn)行選擇調(diào)度,減少不必要的邊block(沒有活動(dòng)頂點(diǎn)的邊block)的訪問。這對(duì)于很多迭代算法如BFS和WCC(絕大部分的頂點(diǎn)處于非活躍狀態(tài))在性能提升上具有極大的貢獻(xiàn)。 實(shí)驗(yàn)環(huán)境采用GridGraph對(duì)比前文中分析的三種分布式圖計(jì)算框架,使用i2.4xlarge實(shí)例(包含16個(gè)超線程內(nèi)核,122 GB RAM,4個(gè)800 GB SSD)的GridGraph與具有16個(gè)m2.4xlarge實(shí)例(每個(gè)具有8個(gè)內(nèi)核,68.4 GB RAM,2 840 GB HDD)的集群上的PowerGraph和GraphX進(jìn)行性能比較,測(cè)試結(jié)果如圖5所示。 圖5 性能測(cè)試 實(shí)驗(yàn)結(jié)果表明,GridGraph無縫地?cái)U(kuò)展內(nèi)存容量和磁盤帶寬,單節(jié)點(diǎn)性能是16節(jié)點(diǎn)分布式系統(tǒng)性能的2~3倍,高出一個(gè)數(shù)量級(jí)。 圖是一種抽象人類行為的方法,圖計(jì)算的應(yīng)用才剛剛開始,隨著大數(shù)據(jù)研究和應(yīng)用的發(fā)展,會(huì)出現(xiàn)更多支持“圖計(jì)算”的系統(tǒng)。文中通過對(duì)目前流行的三種分布式圖計(jì)算框架進(jìn)行剖析,同時(shí)對(duì)比基于單機(jī)單節(jié)點(diǎn)圖計(jì)算模式GridGraph的性能應(yīng)用,可以看出該模式使用網(wǎng)格表示的大規(guī)模圖表,通過分割頂點(diǎn)和邊緣分別為1D塊和2D塊,能夠訪問存儲(chǔ)器中的頂點(diǎn)數(shù)據(jù),而不涉及I/O訪問。不過該圖計(jì)算模型依然受到I/O帶寬的限制,框架模型還可以從網(wǎng)格上采用壓縮技術(shù)減少來自I/O帶寬的限制,進(jìn)一步提升圖計(jì)算性能,這些都有待進(jìn)一步研究。2 并行圖計(jì)算模式算法
2.1 Pregel分布式圖計(jì)算框架
2.2 PowerGraph分布式圖計(jì)算框架
2.3 GraphX分布式圖計(jì)算框架
3 優(yōu)化分布式圖計(jì)算結(jié)合GridGraph模式實(shí)驗(yàn)測(cè)試比較
4 結(jié)束語(yǔ)