王曉桐,房俊華,張蓉
(華東師范大學(xué)數(shù)據(jù)科學(xué)與工程研究院上海高可信計(jì)算重點(diǎn)實(shí)驗(yàn)室,上海200062)
分布“可擴(kuò)展數(shù)據(jù)流連接算法
王曉桐,房俊華,張蓉
(華東師范大學(xué)數(shù)據(jù)科學(xué)與工程研究院上海高可信計(jì)算重點(diǎn)實(shí)驗(yàn)室,上海200062)
Join-Matrix是一種高性能的連接矩陣模型,方B部署于分布式環(huán)境下,支持任意連接謂詞的數(shù)據(jù)流連接操作.由于采取隨機(jī)分發(fā)元組作為路由策略,Join-Matrix可利用對(duì)元組內(nèi)容的不敏感性來(lái)有效抵御數(shù)據(jù)傾斜.為了實(shí)現(xiàn)工作節(jié)點(diǎn)的負(fù)載均衡以及網(wǎng)絡(luò)傳輸代價(jià)的最小化,基于連接矩陣模型設(shè)計(jì)一種高效的數(shù)據(jù)劃分方案尤為重要.針對(duì)數(shù)據(jù)流連接處理,本文設(shè)計(jì)并實(shí)現(xiàn)了一種新穎的連接算子,可靈活地進(jìn)行劃分方案的自適應(yīng)調(diào)整,以應(yīng)對(duì)實(shí)時(shí)動(dòng)態(tài)變化的數(shù)據(jù)分布.具體來(lái)說(shuō),我們根據(jù)數(shù)據(jù)流流量的采樣信息和系統(tǒng)額定負(fù)載,通過(guò)一個(gè)輕量級(jí)的決策器制定出一個(gè)數(shù)據(jù)劃分方案和相應(yīng)的數(shù)據(jù)遷移計(jì)劃,在保證輸出結(jié)果完整性與正確性的情況下,實(shí)現(xiàn)遷移代價(jià)的最小化.本文在多種不同的數(shù)據(jù)集上進(jìn)行了大量對(duì)比實(shí)驗(yàn),結(jié)果證明,在資源利用率、系統(tǒng)吞吐率與時(shí)間延遲等方面,該連接算子較對(duì)比系統(tǒng)具有更高的性能體現(xiàn).
數(shù)據(jù)流連接;Join-Matrix;數(shù)據(jù)劃分;分布式計(jì)算
隨著在線實(shí)時(shí)分析連續(xù)數(shù)據(jù)流的需求日益增多,處理時(shí)刻變化數(shù)據(jù)流的新型應(yīng)用越來(lái)越普遍,包括傳感器網(wǎng)絡(luò)、金融數(shù)據(jù)在線分析和網(wǎng)絡(luò)入侵檢測(cè)等.諸如此類(lèi)的應(yīng)用具有以下特征:①在海量數(shù)據(jù)上執(zhí)行包含復(fù)雜謂詞的連接操作;②在保持高效率和快速響應(yīng)時(shí)間的同時(shí)進(jìn)行實(shí)時(shí)數(shù)據(jù)分析;③需要維持大量依賴(lài)于歷史數(shù)據(jù)的狀態(tài)信息.因此,為了提高數(shù)據(jù)流系統(tǒng)的處理性能,設(shè)計(jì)高效的數(shù)據(jù)流連接算法尤為重要.
研究者對(duì)數(shù)據(jù)流連接算法已投入了大量相關(guān)的研究工作,提出了若干集中式連接算法[1-3].這些連接算法均在一個(gè)中心節(jié)點(diǎn)對(duì)數(shù)據(jù)流進(jìn)行連接處理,不能高效處理數(shù)據(jù)量巨大的流式數(shù)據(jù).由于目前主流的數(shù)據(jù)流處理系統(tǒng)均是分布式的,將連接操作分布式處理更符合數(shù)據(jù)流系統(tǒng)的特點(diǎn).因此分布式數(shù)據(jù)流連接算法應(yīng)運(yùn)而生.大多數(shù)分布式連接算法主要針對(duì)等值連接,處理高選擇性的θ連接性能欠佳.除此之外,分布式算法大多采用哈希函數(shù)進(jìn)行數(shù)據(jù)劃分,對(duì)數(shù)據(jù)內(nèi)容的不敏感性導(dǎo)致不能靈活地進(jìn)行系統(tǒng)結(jié)構(gòu)的擴(kuò)展.
本文的主要研究目標(biāo)是基于連接矩陣模型,設(shè)計(jì)并實(shí)現(xiàn)一種新穎的分布式數(shù)據(jù)流連接算法,旨在提高連接矩陣的可擴(kuò)展性與靈活性,從以下兩方面實(shí)現(xiàn):①探索適當(dāng)?shù)臄?shù)據(jù)劃分方案,以充分利用系統(tǒng)資源;②設(shè)計(jì)高效的狀態(tài)重分配和數(shù)據(jù)路由策略,以降低自適應(yīng)調(diào)整代價(jià)和網(wǎng)絡(luò)傳輸開(kāi)銷(xiāo).文獻(xiàn)[4]指出目前已有的自適應(yīng)技術(shù)只依賴(lài)于啟發(fā)式模型,缺乏理論證明.本文繼承了傳統(tǒng)數(shù)據(jù)劃分方案的特性,并提出了相應(yīng)的改進(jìn)措施.
本文的主要貢獻(xiàn)是:①基于連接矩陣設(shè)計(jì)一種高效的數(shù)據(jù)劃分方案,打破節(jié)點(diǎn)個(gè)數(shù)的規(guī)整性限制,可根據(jù)數(shù)據(jù)動(dòng)態(tài)分布靈活地增刪物理計(jì)算節(jié)點(diǎn),提高系統(tǒng)架構(gòu)的可擴(kuò)展性與靈活性;②頻繁進(jìn)行自適應(yīng)調(diào)整的策略會(huì)導(dǎo)致巨大的網(wǎng)絡(luò)傳輸成本,而保守策略不能根據(jù)數(shù)據(jù)動(dòng)態(tài)變化進(jìn)行自適應(yīng)調(diào)整,從而降低系統(tǒng)的處理性能.為了在二者之間達(dá)到平衡,本文提出一個(gè)在線算法,高效地決定何時(shí)探索和觸發(fā)新的劃分方案;③提出一種統(tǒng)一的、位置感知的遷移機(jī)制,實(shí)現(xiàn)遷移代價(jià)的最小化;④傳統(tǒng)的自適應(yīng)技術(shù)[5-6]以阻塞方式進(jìn)行狀態(tài)數(shù)據(jù)重分配,本文以非阻塞的方式在進(jìn)行數(shù)據(jù)遷移的同時(shí)處理新流入的元組;⑤通過(guò)在多種不同數(shù)據(jù)集上的大量對(duì)比實(shí)驗(yàn)證明,本文提出的連接算子具有良好的性能.
近年來(lái),研究人員利用Join-Matrix矩陣模型進(jìn)行分布式連接查詢(xún)處理,在類(lèi)似MapReduce的系統(tǒng)與數(shù)據(jù)流系統(tǒng)均有涉足.Join-Matrix模型將兩個(gè)數(shù)據(jù)集間的連接操作建模成一個(gè)矩陣,矩陣的每一條邊分別代表一個(gè)數(shù)據(jù)集,每個(gè)矩陣單元代表一個(gè)潛在的連接輸出結(jié)果. Stomos等人在文獻(xiàn)[7]中首次引進(jìn)連接矩陣的概念,在FR算法[8]的基礎(chǔ)上提出了“對(duì)稱(chēng)片段與復(fù)制”算法(symmetric fragment and replicate),以解決FR算法帶來(lái)的計(jì)算代價(jià)與通信代價(jià)龐大的問(wèn)題.在MapReduce的編程框架下,文獻(xiàn)[9]基于連接矩陣提出了兩種數(shù)據(jù)劃分方案,分別是1-Bucket與M-Bucket.1-Bucket采取隨機(jī)分發(fā)元組的路由策略,即內(nèi)容不敏感,在輸出結(jié)果方面可以很好地實(shí)現(xiàn)負(fù)載均衡,但由于過(guò)多地輸入元組復(fù)制存儲(chǔ),在處理低選擇性的連接操作時(shí)性能欠佳.另一方面,M-Bucket根據(jù)輸入元組的內(nèi)容進(jìn)行數(shù)據(jù)劃分,即內(nèi)容敏感,盡管解決了元組冗余存儲(chǔ)的問(wèn)題,但可能導(dǎo)致某些計(jì)算節(jié)點(diǎn)出現(xiàn)過(guò)載的現(xiàn)象.
與本文設(shè)計(jì)思路最相似的研究是Elseidy等人提出的Dynamic連接算子[10].Dynamic連接算子采用“網(wǎng)格劃分方案”將連接矩陣劃分成2n(n∈N*)個(gè)面積相等的區(qū)域,并采用隨機(jī)路由策略分發(fā)輸入元組.由于要維持矩陣的結(jié)構(gòu)特性,當(dāng)架構(gòu)需要進(jìn)行擴(kuò)展或縮減時(shí),必須同時(shí)增加或刪除一行或一列的所有處理單元,由此引發(fā)遷移代價(jià)的劇增和資源利用率的降低,算子結(jié)構(gòu)的靈活性與可擴(kuò)展性也深受影響.為了解決這個(gè)問(wèn)題,本文設(shè)計(jì)了一種更為靈活的數(shù)據(jù)劃分方案,達(dá)到更好的效果.
2.1 預(yù)備知識(shí)
Join-Matrix以矩陣的形式處理R??S,矩陣的每一條邊代表一條數(shù)據(jù)流,矩陣單元代表潛在的連接輸出結(jié)果.如圖1(a)所示,在連接矩陣中進(jìn)行不等值連接操作,圖中的數(shù)字代表連接屬性,7色的單元格代表符合連接謂詞的輸出結(jié)果.基于連接矩陣M的數(shù)據(jù)劃分方案將矩陣切分成n×m個(gè)面積相等的處理單元Cij,每個(gè)處理單元分配一臺(tái)物理計(jì)算節(jié)點(diǎn),并存儲(chǔ)數(shù)據(jù)流的子集〈Ri,Sj〉,其中i∈[0,n一1],j∈[0,m一1],并用[b,e]代表子數(shù)據(jù)集對(duì)于數(shù)據(jù)流的位置范圍.如圖1(b)所示,將圖1(a)中的矩陣切分成2×4個(gè)處理單元,每個(gè)處理單元分別存儲(chǔ)1/2的R流數(shù)據(jù)和1/4的S流數(shù)據(jù).
圖1 連接矩陣及劃分方案示例Fig.1Example of join-matrix and partitioning scheme
處理θ連接操作的代價(jià)主要與系統(tǒng)的內(nèi)存開(kāi)銷(xiāo)、CPU計(jì)算成本以及網(wǎng)絡(luò)通信代價(jià)有關(guān),其中CPU的計(jì)算成本與連接矩陣的計(jì)算區(qū)域面積|R|·|S|成正比(|R|和|S|分別代表兩條數(shù)據(jù)流的流量,即元組的數(shù)量),與選取的數(shù)據(jù)劃分方案無(wú)關(guān),即獨(dú)立于矩陣的行數(shù)n和列數(shù)m.根據(jù)文獻(xiàn)[10],內(nèi)存開(kāi)銷(xiāo)與單個(gè)處理單元的半周長(zhǎng)|Ri|+|Sj|成比例,而|Ri|+|Sj|取決于矩陣的行數(shù)與列數(shù).對(duì)于網(wǎng)絡(luò)通信代價(jià)同樣成立.因此,本文旨在尋求合適的數(shù)據(jù)劃分方案n×m,使得系統(tǒng)的資源使用量最低.假設(shè)單個(gè)處理單元的額定內(nèi)存大小為V,則本文的目標(biāo)可以形式化定義成以下優(yōu)化問(wèn)題:
2.2 劃分方案
由于連接矩陣的行數(shù)與列數(shù)決定了內(nèi)存開(kāi)銷(xiāo),關(guān)于矩陣的面積與周長(zhǎng),我們已知兩個(gè)常識(shí):①給定面積的所有矩陣中,正方形的周長(zhǎng)最小;②給定周長(zhǎng)的所有矩陣中,正方形的面積最大.基于上述常識(shí),我們得出以下定理.
證明首先假設(shè)單個(gè)處理單元的CPU計(jì)算資源為定值,為了確保兩條流的任意元組均可相遇,則R??S的計(jì)算復(fù)雜度為O(|R|·|S|).當(dāng)時(shí),系統(tǒng)的內(nèi)存使用量最小;其次假設(shè)單個(gè)處理單元的內(nèi)存空間是定值,當(dāng)時(shí),連接矩陣使用的處理單元總數(shù)最小.由于網(wǎng)絡(luò)傳輸代價(jià)與內(nèi)存開(kāi)銷(xiāo)相關(guān),因此定理1成立.
根據(jù)定理1,如果數(shù)據(jù)流流量|R|和|S|均可被Vh整除,則由此生成的數(shù)據(jù)劃分方案是最優(yōu)的.但是大多數(shù)情況下,數(shù)據(jù)流流量不能被Vh整除,考慮到矩陣的行數(shù)與列數(shù)必須為整數(shù),我們令,則連接矩陣使用的處理單元總數(shù)N為:
由于數(shù)據(jù)流流量不能被Vh整除,在裝載輸入元組的過(guò)程中,矩陣最后一行或者一列的處理單元中會(huì)產(chǎn)生數(shù)據(jù)碎片,我們稱(chēng)這些處理單元為“碎片單元”.我們假設(shè)V=8 GB,|R|= 9 GB,|S|=9 GB,則R??S對(duì)應(yīng)的計(jì)算區(qū)域如圖1(a)所示.根據(jù)公式(2),將矩陣劃分成9個(gè)處理單元,各個(gè)處理單元存儲(chǔ)數(shù)據(jù)的情況分別是C00=〈4 GB,4 GB〉,C01=〈4 GB,4 GB〉, C02=〈4 GB,1 GB〉,C10=〈4 GB,4 GB〉,C11=〈4 GB,4 GB〉,C12=〈4 GB,1 GB〉,C20=〈1 GB,4 GB〉,C21=〈1 GB,4 GB〉,C22=〈1 GB,1 GB〉.顯然,C02、C12、C20、C21和C22均為碎片單元,因?yàn)榇鎯?chǔ)的R流和S流數(shù)據(jù)總量低于額定內(nèi)存空間.
圖2 數(shù)據(jù)劃分方案Fig.2Partitioning scheme
為了充分利用系統(tǒng)資源,實(shí)現(xiàn)處理單元之間的負(fù)載均衡十分重要.我們將兩條數(shù)據(jù)流定義為主流P和副流D以作區(qū)分.主流P可以為數(shù)據(jù)流R或者S.首先保證主流P的元組數(shù)據(jù)分配到足夠的內(nèi)存空間,將P切分成Pγ個(gè)子集分發(fā)到處理單元中;其次將單個(gè)處理單元中剩余的內(nèi)存分配給副流D的元組數(shù)據(jù),則劃分副流D得到的子集個(gè)數(shù).因此,處理單元總數(shù)N為:
算法1闡述了基于連接矩陣制定數(shù)據(jù)劃分方案的具體過(guò)程.首先,將Pγ中的四個(gè)元素依次代入等式(3)計(jì)算出對(duì)應(yīng)的處理單元個(gè)數(shù)Ni(i 6 4),選擇值最小的Ni作為處理單元總數(shù),并將對(duì)應(yīng)的賦值給于Pγ和Dγ(第1~5行).其次根據(jù)主流P的流量和子集個(gè)數(shù)Pγ,計(jì)算出連接矩陣的行數(shù)n和列數(shù)m:如果,則數(shù)據(jù)流R為主流P,n=Pγ,m=Dγ;否則數(shù)據(jù)流S為主流P,m=Pγ,n=Dγ(第6~10行).
2.3 遷移計(jì)劃
在進(jìn)行劃分方案的切換之前,需要先確定新舊矩陣中處理單元的對(duì)應(yīng)關(guān)系.假設(shè)Cij和Ckl分別是舊矩陣M0和新矩陣Mn中的處理單元,我們利用一個(gè)相關(guān)系數(shù)來(lái)衡量?jī)蓚€(gè)處理單元Cij和Ckl之間數(shù)據(jù)集重疊度,給出如下定義:
給定矩陣M0和Mn,定義處理單元之間的關(guān)系映射條目.更新處理單元映射關(guān)系表可分為兩步驟:①枚舉出所有可能的npi;②選取值最大的npi作為最終條目插入到關(guān)系映射表NP.
遷移計(jì)劃決定了矩陣變換期間數(shù)據(jù)是如何在處理單元之間重新分配的.為了方B描述,下面我們將只討論R流的數(shù)據(jù)遷移,對(duì)于S流采取類(lèi)似的操作.我們將需要遷入處理單元Ckl的 R流數(shù)據(jù)集定義如下:
圖3 數(shù)據(jù)遷移示例Fig.3Example of data migration
3.1 實(shí)驗(yàn)環(huán)境
實(shí)驗(yàn)設(shè)備:22個(gè)處理節(jié)點(diǎn)的刀片機(jī)服務(wù)器集群,單個(gè)節(jié)點(diǎn)有2個(gè)四核四線程處理器,型號(hào)為Intel Xeon E5335,主頻2.00 GHZ,并配有共計(jì)16 GB的RAM以及2 TB的硬盤(pán).所有節(jié)點(diǎn)運(yùn)行CentOS 6.5 Linux操作系統(tǒng),Apache Storm 0.10.0[12]以及Java 1.7.0.
數(shù)據(jù)集:使用TPC-benchmark[13]的數(shù)據(jù)生成器dbgen生成不同規(guī)模的數(shù)據(jù)集.我們對(duì)這些數(shù)據(jù)集進(jìn)行預(yù)處理,即將其調(diào)整為在連接屬性上具有Zipf分布的形式,通過(guò)參數(shù)z調(diào)整數(shù)據(jù)傾斜程度,默認(rèn)情況下,我們將數(shù)據(jù)集的傾斜度設(shè)置為1.
查詢(xún)語(yǔ)句:我們使用[10]中的等值查詢(xún)語(yǔ)句EQ5和范圍查詢(xún)語(yǔ)句BNCI.其中EQ5是[10]重定義的TPC-H中Q5查詢(xún)中代價(jià)較高的幾個(gè)連接謂詞組成的查詢(xún)語(yǔ)句;BNCI是按照某一屬性范圍查找其在另一個(gè)數(shù)據(jù)集中的匹配記錄.
3.2 評(píng)估指標(biāo)
我們將通過(guò)以下四個(gè)指標(biāo)對(duì)系統(tǒng)的資源利用率和處理性能進(jìn)行評(píng)估:①處理單元數(shù):系統(tǒng)運(yùn)行過(guò)程中,連接算子使用到的處理單元的總數(shù),單個(gè)處理單元分配額定大小的內(nèi)存空間;②吞吐率:單位時(shí)間內(nèi)系統(tǒng)成功接收并處理的元組數(shù)量;③遷移量:新舊連接矩陣進(jìn)行轉(zhuǎn)換期間,需要拷貝和移動(dòng)的元組總量;④計(jì)劃耗時(shí):根據(jù)當(dāng)前系統(tǒng)的工作負(fù)載制定數(shù)據(jù)劃分方案、更新單元映射表以及生成遷移計(jì)劃的總耗時(shí).
3.3 對(duì)比系統(tǒng)
我們使用了三種不同的連接算子來(lái)進(jìn)行對(duì)比實(shí)驗(yàn):①M(fèi)FM.本文提出的自適應(yīng)連接算子,根據(jù)等式3計(jì)算出最優(yōu)的連接矩陣及數(shù)據(jù)劃分方案;②Dynamic.文獻(xiàn)[10]設(shè)計(jì)的連接算子,限制連接矩陣個(gè)數(shù)必須為2的冪次方個(gè),以單個(gè)處理單元×4的形式進(jìn)行矩陣的擴(kuò)展;③Readj.文獻(xiàn)[11]設(shè)計(jì)的連接算子,以key為粒度,通過(guò)一個(gè)哈希函數(shù)重新調(diào)整各處理單元的工作負(fù)載以實(shí)現(xiàn)負(fù)載均衡.
3.4 結(jié)果與分析
實(shí)驗(yàn)在全歷史模式下進(jìn)行,并通過(guò)調(diào)整輸入數(shù)據(jù)的傾斜度驗(yàn)證連接算法的靈活性和自適應(yīng)性.設(shè)置V=8·105,并連續(xù)地將6·106條元組數(shù)據(jù)裝載入系統(tǒng)中.圖4展示了執(zhí)行BNCI時(shí)處理單元數(shù)與遷移代價(jià)的變化趨勢(shì).隨著數(shù)據(jù)的不斷流入,Dynamic算子占用的處理單元數(shù)大幅度遞增,導(dǎo)致消耗的內(nèi)存空間也急劇增加.相反,MFM根據(jù)當(dāng)前系統(tǒng)的負(fù)載情況按需分配資源,占用的處理單元數(shù)遠(yuǎn)遠(yuǎn)少于Dynamic.相應(yīng)地,為了維持連接矩陣的結(jié)構(gòu)特性,Dynamic需要進(jìn)行大規(guī)模的數(shù)據(jù)備份;而MFM算子使用較少的處理單元數(shù),充分利用系統(tǒng)資源.因此, Dynamic算子在數(shù)據(jù)遷移期間產(chǎn)生的遷移代價(jià)遠(yuǎn)遠(yuǎn)超過(guò)MFM.
圖4 BNCI無(wú)窗口模式Fig.4Full-history join with BNCI
圖5 EQ5全歷史模式Fig.5Full-history join with EQ5
為了保證系統(tǒng)的負(fù)載均衡,對(duì)于單個(gè)處理單元和單位時(shí)間間隔t,定義均衡度標(biāo)識(shí),其中為所有處理單元的平均負(fù)載.在本組實(shí)驗(yàn)中,執(zhí)行查詢(xún)語(yǔ)句EQ5,并設(shè)置θt6 0.05.如圖5(a)所示,Readj的計(jì)劃耗時(shí)高于其余兩種連接算子三個(gè)數(shù)量級(jí).究其原因可知, Readj通過(guò)一個(gè)哈希函數(shù)調(diào)整所有處理單元中的工作負(fù)載,因此在進(jìn)行擴(kuò)容操作時(shí),Readj需要重新計(jì)算全局的均衡狀態(tài),而其余兩種連接算子均采用內(nèi)容不敏感性的隨機(jī)路由策略,無(wú)需進(jìn)行平衡調(diào)度.圖5(b)給出了三種連接算子在不同數(shù)據(jù)傾斜程度下的吞吐率.一方面,隨著傾斜參數(shù)的遞增,由于計(jì)劃耗時(shí)長(zhǎng),Readj的吞吐率呈現(xiàn)遞減趨勢(shì).另一方面,盡管Dynamic連接算子占用的處理單元遠(yuǎn)多于MFM,但是由于其龐大的數(shù)據(jù)遷移量,MFM的吞吐率略高.
為在數(shù)據(jù)流系統(tǒng)上高效地執(zhí)行分布式θ連接操作,本文基于連接矩陣模型提出可靈活地進(jìn)行自適應(yīng)調(diào)整的連接算法,利用對(duì)其內(nèi)容不敏感性抵御數(shù)據(jù)傾斜,根據(jù)當(dāng)前系統(tǒng)負(fù)載按需分配資源,采用非阻塞的方式處理數(shù)據(jù)遷移并保證連接結(jié)果的完整性與正確性.實(shí)驗(yàn)證明,對(duì)比目前已有的連接算法,本文提出的連接算法性能更為優(yōu)越且穩(wěn)定.未來(lái)的工作將會(huì)考慮對(duì)連接矩陣模型進(jìn)一步優(yōu)化,打破矩陣單元個(gè)數(shù)規(guī)整性的限制以實(shí)現(xiàn)更為優(yōu)良的性能.
[1]DITTRICH J-P,SEEGER B,TAYLOR D S,et al.Progressive merge join:A generic and non-blocking sort-based join algorithm[C]//Proceedings of the 28th VLDB Conference.2002:299-310.
[2]URHAN T,FRANKLIN M J.XJoin:A reactively-scheduled pipelined join operator[J].IEEE Data Eng Bull, 2000,23(2):27-33.
[3]WANG S,RUNDENSTEINER E.Scalable stream join processing with expensive predicates:Workload distribution and adaptation by time-slicing[C]//Proceedings of the 12th Conference on EDBT.2009:299-310.
[4]GOUNARIS A,TSAMOURA E,MANOLOPOULOS Y.Adaptive query processing in distributed settings[J]. Intelligent Systems Reference Library,2013,36:211-236.
[5]LIU B,JBANTOVA M,RUNDENSTEINER E A.Optimizing state-intensive non-blocking queries using run-time adaptation[C]//Proceedings of the 2007 IEEE 23rd ICDEW.IEEE,2007:614-623.
[6]PATON N W,BUENABAD-CHAVEZ J,CHEN M,et al.Autonomic query parallelization using non-dedicated computers:An evaluation of adaptivity options[J].The VLDB Journal,2009,18(1):119-140.
[7]STAMOS J W,YOUNG H C.A symmetric fragment and replicate algorithm for distributed joins[J].IEEE Transactions on Parallel&Distributed Systems,1993,4(12):1345-1354.
[8]EPSTEIN R,STONEBRAKER M,WONG E.Distributed query processing in a relational data base system [C]//Proceedings of ACM SIGMOD Conference on Management of Data.1978:169-180.
[9]OKCAN A,RIEDEWALD M.Processing theta-joins using MapReduce[C]//Proceedings of ACM SIGMOD Conference on Management of Data.2011:949-960.
[10]ELSEIDY M,ELGUINDY A.Scalable and adaptive online joins[J].The VLDB Endowment,2014,7(6):441-452.
[11]GEDIK B.Partitioning functions for stateful data parallelism in stream processing[J].The VLDB Journal,2013, 23(4):517-539.
[12]Apache storm[EB/OL].[2016-06-10].http://storm.apache.org.
[13]The TPC-H benchmark[EB/OL].[2016-06-10].http://www.tpc.org/tpch.
(責(zé)任編輯:林磊)
Distributed and scalable stream join algorithm
WANG Xiao-tong,FANG Jun-hua,ZHANG Rong
(Institute for Data Science and Engineering,Shanghai Key Laboratory of Trustworthy Computing,East China Normal University,Shanghai200062,China)
Join-Matrix is a high-performance model for stream join processing in a parallel shared-nothing environment,which supports arbitrary join operations and is resilient to data skew for taking random tuple distribution as its routing policy.To evenly distribute workload and minimize network communication cost,designing an efficient partitioning policy on the matrix is particularly essential.In this paper,we propose a novel stream join operator that continuously adjust its partitioning scheme to real-time data dynamics.Specifically,based on the sample statistics of streams and rated load of each physical machine,a lightweight scheme generator produces a partitioning scheme; then the corresponding solutions for state relocation are generated by a migration plan generator to minimize migration cost while ensuring result correctness.Our experiments on different kinds of data sets demonstrate that our operator outperforms the static-of-the-artstrategies in resource utilization,throughput and system latency.
stream join processing;Join-Matrix;partitioning scheme;distributed computing
TP391
A
10.3969/j.issn.1000-5641.2016.05.010
1000-5641(2016)05-0081-08
2016-05
國(guó)家863計(jì)劃項(xiàng)目(2015AA015307);國(guó)家自然科學(xué)基金重點(diǎn)項(xiàng)目(61232002,61332006);國(guó)家自然科學(xué)基金(61432006)
王曉桐,女,碩士研究生,研究方向?yàn)閿?shù)據(jù)流處理.E-mail:51164500121@stu.ecnu.edu.cn.
張蓉,女,博士,副教授,研究方向?yàn)榉植际綌?shù)據(jù)管理.E-mail:rzhang@sei.ecnu.edu.cn.
華東師范大學(xué)學(xué)報(bào)(自然科學(xué)版)2016年5期