郭乃網(wǎng) 覃 晟 談子敬 曹滿亮
1(國網(wǎng)上海市電力公司電力科學(xué)研究院 上海 200437)
2(復(fù)旦大學(xué) 上海 200433)
隨著數(shù)字化技術(shù)的廣泛應(yīng)用,為了保證在市場經(jīng)濟(jì)環(huán)境下系統(tǒng)能安全、可靠運(yùn)行,各種管理信息系統(tǒng)(MIS)、地理信息系統(tǒng)(GIS)、電網(wǎng)運(yùn)行的實(shí)時(shí)信息系統(tǒng)等的廣泛應(yīng)用,產(chǎn)生了海量的工商業(yè)運(yùn)行數(shù)據(jù)。面對這種海量數(shù)據(jù)的增加,數(shù)據(jù)挖掘技術(shù)可以發(fā)現(xiàn)更深層次的規(guī)律。數(shù)據(jù)質(zhì)量問題是數(shù)據(jù)挖掘過程中經(jīng)常面臨的挑戰(zhàn)。目前的研究大多著眼于數(shù)據(jù)挖掘算法的探討,而忽視了數(shù)據(jù)分析前對數(shù)據(jù)質(zhì)量處理的研究。一些比較成熟的算法對其處理的數(shù)據(jù)集合都有一定的要求,比如數(shù)據(jù)完整性好、數(shù)據(jù)的冗余性少、屬性之間的相關(guān)性小。然而,實(shí)際系統(tǒng)中的數(shù)據(jù)通常存在各種問題,很少能直接滿足數(shù)據(jù)挖掘算法的要求。同時(shí)海量的數(shù)據(jù)中無意義和錯誤的成分也普遍存在,嚴(yán)重影響了數(shù)據(jù)挖掘算法的執(zhí)行效率。因此,如何提高數(shù)據(jù)質(zhì)量已經(jīng)成為數(shù)據(jù)挖掘技術(shù)應(yīng)用中的關(guān)鍵問題,引起了學(xué)術(shù)界的廣泛關(guān)注[1-2]。
基于數(shù)據(jù)約束的數(shù)據(jù)修復(fù)技術(shù)通過將數(shù)據(jù)集修改為滿足數(shù)據(jù)約束的形式,來提高和改進(jìn)數(shù)據(jù)質(zhì)量。本文重點(diǎn)研究基于字典序次序依賴(Lexicographical Order Dependency)的數(shù)據(jù)修復(fù)問題。字典序次序依賴[1-2]定義屬性列表間的次序關(guān)系,因而在SQL的Order by等優(yōu)化中取得了良好的效果。要充分發(fā)揮字典序次序依賴的作用,數(shù)據(jù)集必須滿足給定的依賴定義。這提示我們需要通過數(shù)據(jù)修復(fù)將存在錯誤和誤差的現(xiàn)實(shí)數(shù)據(jù)修復(fù)成為正確的滿足約束的形式。然而,面對大量數(shù)據(jù)的修復(fù)問題,已有的集中式修復(fù)算法在運(yùn)行效率方面有所欠缺。特別是在面對電力數(shù)據(jù)等海量規(guī)模的數(shù)據(jù)時(shí),已有方法難以在可接受的時(shí)間內(nèi)完成修復(fù)任務(wù)。通過分布式修復(fù)技術(shù),我們一方面旨在處理由于單機(jī)的存儲限制,無法有效處理的大數(shù)據(jù)集,也希望利用分布式的計(jì)算方法,通過利用更多計(jì)算資源來提升算法在大數(shù)據(jù)下的運(yùn)行效率。
針對海量字典序次序依賴的數(shù)據(jù)修復(fù)問題,本文提出一種基于Spark分布式計(jì)算框架的字典次序依賴數(shù)據(jù)修復(fù)算法,為數(shù)據(jù)質(zhì)量的提高提供一種高效可行的處理方式。該方法充分利用分布式計(jì)算的特點(diǎn),通過將數(shù)據(jù)和計(jì)算分布到不同計(jì)算節(jié)點(diǎn),以提高運(yùn)行效率。
本文提出的分布式計(jì)算在通用的真實(shí)數(shù)據(jù)集上進(jìn)行實(shí)驗(yàn)驗(yàn)證。實(shí)驗(yàn)結(jié)果顯示本文算法可以有效地提高計(jì)算效率:實(shí)際計(jì)算時(shí)間消耗與數(shù)據(jù)規(guī)模成亞線性關(guān)系;并行度的提高顯著地減少了計(jì)算時(shí)間消耗。
字典序次序依賴是定義在屬性列之上,在依賴的左右兩側(cè),不像函數(shù)依賴等那樣是屬性的集合,而是屬性的有序列,因此字典序次序依賴性質(zhì)非常特別。我們首先通過表1中的例子,來引出字典序次序依賴的具體內(nèi)容。
表1 考慮如下與環(huán)球旅行有關(guān)的數(shù)據(jù)表r
根據(jù)語義,可以得出如下三個(gè)次序依賴:
第一條表示,隨著編號的遞增,航班的日期也會遞增;第二條表示,航班的日期遞增,會使得總消費(fèi)遞增;而第三條則是函數(shù)依賴“城市決定國家”City→Country的次序依賴表述形式,將函數(shù)依賴的左側(cè)屬性作為字典序次序依賴的左側(cè)屬性列,而將函數(shù)依賴的右側(cè)屬性加在左側(cè)屬性之后,作為次序依賴的右側(cè)屬性列,則可以得到由字典序次序依賴表示的函數(shù)依賴。
根據(jù)這三條字典序次序依賴,可以看出表1中的部分元組并不符合這三條次序依賴。例如第五行的[Month,Day]與其他四條元組都同時(shí)違背了OD1和OD2。第三行和第四行的City相同,而Country不同,在OD3上產(chǎn)生了沖突。若將第五行的[Month]改為3,并將第四行的[City]改為BUR,則三條字典序次序依賴都得到滿足。
修復(fù)過程中修改一個(gè)元組中一個(gè)屬性的值,需要的代價(jià)為1。為了保證修復(fù)后,左側(cè)屬性值相同的元組右側(cè)屬性值也相同,可以在左側(cè)屬性上構(gòu)建等價(jià)類,即所有在A上相同值的元組,構(gòu)成一個(gè)等價(jià)類,并且從小到大可以賦予編號ecno。在同一個(gè)等價(jià)類中,右側(cè)屬性B上的值可能不同。如果有w個(gè)元組的B值相同,則在最終修復(fù)后,這w個(gè)元組的B值要么都被修改為同一個(gè)新值,要么不變,前者對應(yīng)修復(fù)代價(jià)為w,后者對應(yīng)修復(fù)代價(jià)為0。而根據(jù)次序依賴的定義,最終需要在A上遞增時(shí),B上也要遞增。A上的遞增可以由ecno來反映,而B就無法簡單用編號來衡量或者做標(biāo)記,因此B上的值需要保留在具體的修復(fù)過程之中。算法第一部分,需要將分布式的數(shù)據(jù),轉(zhuǎn)變?yōu)橛上嗤笥覍傩灾档脑M組成的最小單元minUnit的集合,每個(gè)minUnit中需要含有三個(gè)元素:
(1)value=v:表示這些元組的B值。
(2)value=e:表示這些元組的A值在從小到大排列下的序號。
(3)weight=w:表示這些元組的個(gè)數(shù),即修復(fù)這部分元組需要的代價(jià)。
其計(jì)算流程如圖1所示。輸入:數(shù)據(jù)表(這里每個(gè)元組由兩個(gè)屬性值A(chǔ)和B,用空格分隔);輸出:分布式下的minUnit。示例參加見圖2。
圖1 Spark下disMake_minUnit流程
圖2 Spark下disMake_minUnit示例圖
Spark的特點(diǎn)是對數(shù)據(jù)不斷進(jìn)行變化操作,因此偽代碼和圖1都較抽象,因此這里給出一個(gè)示例圖。圖2給出RDD的數(shù)據(jù)流的詳細(xì)變化過程,并對每一步操作作出解釋。
(1)-(2):從HDFS中讀入元組,并轉(zhuǎn)化成鍵值對(ai,bi)的形式。
(3):將相同a值的元組進(jìn)行聚合,轉(zhuǎn)化為(ai,[bi1,bi2,…])。
(4)-(5):根據(jù)a值排序,并給出等價(jià)類編號,轉(zhuǎn)化為(ai,[bi1,bi2,…],ecnoi),此時(shí)ecnoi的大小已經(jīng)可以代替a的大小順序。
(6):重新將聚合的bi數(shù)組拆開,附上ecnoi的信息,轉(zhuǎn)化為(ecnoi,bij,1),最后的1為了后一步的統(tǒng)計(jì)做準(zhǔn)備。
(7):對所有相同的(ecno,b)的元組進(jìn)行聚合,并且更改內(nèi)部順序?yàn)?b,ecno,weight)的格式,由于這里的b對應(yīng)的是minUnit的value值,所以此時(shí)已經(jīng)產(chǎn)生了需要的minUnit單元格式(value,ecno,weight)。
(8):最后對所有的minUnit進(jìn)行排序,先根據(jù)value值從小到大,value相同的則根據(jù)ecno從小到大。具有相同value和ecno的元組會被匯聚在一起,不會存在兩個(gè)minUnit的value和ecno均相同,minUnit的順序固定且唯一。
至此,minUnit分布式存儲于各個(gè)Partitions之中等待后續(xù)使用。
算法的第二步是在構(gòu)建的minUnit中找到若干個(gè)“可靠的”數(shù)據(jù),以方便在第三步中根據(jù)不動單元進(jìn)行數(shù)據(jù)分割,在分割后的每一段數(shù)據(jù)上進(jìn)行修復(fù)?!翱煽康摹睌?shù)據(jù)是指“極有可能未被污染的干凈數(shù)據(jù)”,因此有以下特征:
(1) 所有不動單元上的value隨著ecno單調(diào)遞增,即滿足次序依賴。
(2) 每一個(gè)不動單元的權(quán)重盡可能大。
(3) 不動單元在整個(gè)數(shù)據(jù)集上的分布盡可能平均。
集中式算法可以采用動態(tài)規(guī)劃,而在分布式啟發(fā)式算法中,我們采用遞歸算法和二分算法進(jìn)行替代。
計(jì)算流程如圖3所示。輸入:分成p個(gè)部分的minUnit,均衡系數(shù)α,篩選層數(shù)f,value最小值ub,value最大值lb;輸出:不動單元序列L。
圖3 Spark下disMake_fixedUnit流程
(2) 在每一個(gè)需要進(jìn)行篩選的Partition之中,挑選value在最小值和最大值之間,并且具有最大weight的minUnit作為這個(gè)部分的最佳minUnit。如果有多個(gè)minUnit都有最大weight,則隨機(jī)挑選一個(gè);如果沒有minUnit的value符合上述條件,則可以無候選minUnit。
(3) 將分布式下各個(gè)部分得到的候選不動單元收集到主節(jié)點(diǎn)。
(4) 模仿(2)中的算法,在收集后的候選不動單元中,找到weight最大的,作為此次算法找到的不動單元FixedUnit,其value值為vf;在多次遞歸之后,可能會出現(xiàn)不存在不動單元的情況,此時(shí)說明所有的數(shù)據(jù)都需要進(jìn)行調(diào)整,而調(diào)整的值可以由ub或者lb確定,因此可直接返回。
(5) 將原先的分布式minUnit分成兩部分,排在FixedUnit之前的作為upper_minUnit,而排在FixedUnit之后的作為lower_minUnit,兩部分的Partition數(shù)量可以適當(dāng)減少。對于兩部分拆開的minUnit,分別遞歸執(zhí)行disMake_fixedUnit算法,算法參數(shù)為:對于upper_minUnit,其value最小值為ub,value最大值為vf,篩選層數(shù)為f-1;對于lower_minUnit,其value最小值為vf,value最大值為lb,篩選層數(shù)為f-1。兩部分同時(shí)進(jìn)行并行操作。
(6) 在兩部分分別得到不動點(diǎn)序列后,將兩部分序列和選出的FixedUnit有序合并,得到不動點(diǎn)序列,進(jìn)行返回。
算法初始輸入中ub和lb為負(fù)無窮大和正無窮大,只要保證在B列的值域兩側(cè)即可。初始α可以決定不動點(diǎn)的分布,α偏大,則不動點(diǎn)更集中于中間,時(shí)間上會得到優(yōu)化,但是有可能會使得原本最佳的不動點(diǎn)被直接過濾;α偏小,則二分遞歸時(shí),時(shí)間可能不平均。初始的f決定了最后不動點(diǎn)的數(shù)量,由于運(yùn)行到后期時(shí),可能得不到不動點(diǎn),所以最終程序返回的不動點(diǎn)序列,其數(shù)量小于2f。
在得到n個(gè)不動單元之后,可以對數(shù)據(jù)進(jìn)行重新分組,根據(jù)ecno在不動單元之間的位置,將所有的minUnit分成n+1組,除了首尾兩組,每一組均存在于兩個(gè)不動單元之間。根據(jù)不動單元給出的上下界和文獻(xiàn)[5]中給出的集中式修復(fù)算法,對minUnit進(jìn)行修復(fù);而對于第一組數(shù)據(jù),沒有下界,可視作下界為負(fù)無窮;對于最后一組數(shù)據(jù),沒有上界,可視作上界為正無窮。算法1給出修復(fù)部分的算法。
算法1OD修復(fù)DisFix
輸入:不動點(diǎn)列表,分布式minUnit。
輸出:修復(fù)后的minUnit。
1. Repartition minUnits according to FixedUnit
2. //parallelly,ineach partition
3. rep←α NULL List
4. for mU∈mniUnits do
5. if (mU value 6. mU value←lowerbound 7. remove mU and add mU into rep 8. else if (mU value>upperbound) then 9. mU value←upperbound 10. remove mU and add mU into rep 11. use Fix to repair mniUnits 12. Return rep+mniUnits 算法1主要側(cè)重的是后半部分的修復(fù),而Spark的RDD數(shù)據(jù)流操作只在前兩行出現(xiàn),后續(xù)的步驟都是并行運(yùn)行于每一個(gè)Partition之中,因此這里給出偽代碼。第1行minUnit進(jìn)行重新分組,之后的第3至12行在每一個(gè)Partition運(yùn)行。首先建立一個(gè)空列表(第3行),用于存放數(shù)值超越上下界的minUnit,對于這部分,直接修改成對應(yīng)的值即算修復(fù)完畢,從原列表移動至rep之中(第5至10行)。這里要注意,重新分組后分成的n+1組數(shù)據(jù)中,除了首尾兩組,其余數(shù)據(jù)都有上下界,而對于第一組數(shù)據(jù),沒有下界,可直接跳過第5至7行;對于最后一組數(shù)據(jù),沒有上界,可直接跳過第8至10行。對于列表中剩下的數(shù)據(jù),值處于上下界之間,因此在該節(jié)點(diǎn)之中,可以視作為單機(jī)的od修復(fù),因此可以使用文獻(xiàn)[5]中的修復(fù)算法。 實(shí)驗(yàn)使用的是一個(gè)真實(shí)數(shù)據(jù)集,關(guān)于美國航班情況的數(shù)據(jù)集,這個(gè)數(shù)據(jù)集也在文獻(xiàn)[2,5]中被使用。本次實(shí)驗(yàn)使用的分布式環(huán)境為Spark,版本為2.0.0,主要啟動參數(shù)見表2。一共進(jìn)行了三組實(shí)驗(yàn),分別探討時(shí)間與元組數(shù)目、分塊數(shù)目的關(guān)系,以及均衡系數(shù)α對于整個(gè)修復(fù)算法的影響。默認(rèn)實(shí)驗(yàn)條件為,元組數(shù)目20萬行,屬性4列,在實(shí)驗(yàn)前,對5%的數(shù)據(jù)添加誤差,觀察最終的修復(fù)情況;而在分布式的參數(shù)中,默認(rèn)分塊數(shù)目p=10,均衡系數(shù)α=0.2。由于分布式下的修復(fù)算法并沒有其他替代算法,因此只是對自身的實(shí)驗(yàn)結(jié)果進(jìn)行分析。 表2 Spark啟動參數(shù)表 實(shí)驗(yàn)1探究的是,不同元組數(shù)目下,算法需要的時(shí)間。元組數(shù)目從10萬行增加至40萬行時(shí),需要的時(shí)間變化情況如圖4所示。由于數(shù)據(jù)量非常大,并且分布式下的算法為啟發(fā)式算法,所以無法用文獻(xiàn)[5]中的集中式算法進(jìn)行時(shí)間和效果上的比較。 圖4 實(shí)驗(yàn)1的實(shí)驗(yàn)結(jié)果 可以看出,元組數(shù)目增加會導(dǎo)致總時(shí)間的增加,但是增加的時(shí)間并非線性增加,而是獲得了亞線性效果。主要原因有以下幾點(diǎn):首先算法復(fù)雜度理論上是線性,計(jì)算時(shí)間隨著元組數(shù)目增加,線性增加。但是隨著實(shí)際數(shù)據(jù)集的變大,在構(gòu)建等價(jià)類的這一步之中,可能會隨著重復(fù)數(shù)據(jù)的增加,改變了每一個(gè)minUnit的權(quán)重,而對minUnit的數(shù)目并沒有產(chǎn)生太大的影響,由此造成整體的曲線偏離線性關(guān)系。另一方面,當(dāng)進(jìn)入算法的第三部分后,需要依賴于文獻(xiàn)[5]中的Fix算法,這個(gè)算法復(fù)雜度是O(nlogn),不過在此之前,有部分的minUnit會因?yàn)樘幵谶@一個(gè)分塊的數(shù)據(jù)范圍之外而被直接修改,因此實(shí)際參與到Fix的數(shù)據(jù)個(gè)數(shù)也有限。如果對于一些數(shù)據(jù),雖然在初始時(shí)增加了誤差,但因?yàn)榧恿苏`差之后還是沒有違背次序依賴,也就不需要被修復(fù),因而總時(shí)間下降。 實(shí)驗(yàn)2探究的是在選擇不同的并行分塊個(gè)數(shù)時(shí),需要的時(shí)間變化情況。分塊數(shù)目從5增加至40,時(shí)間與分塊數(shù)目之間的關(guān)系如圖5所示。 圖5 實(shí)驗(yàn)2的實(shí)驗(yàn)結(jié)果 可以看出,分塊數(shù)目增加,計(jì)算時(shí)間減少,基本呈線性下降的關(guān)系。這種線性關(guān)系是當(dāng)前大數(shù)據(jù)分布式計(jì)算框架的重要特征,因此說明本分布式算法的設(shè)計(jì)是合理的。從并行的角度來說,并行數(shù)目的增加,可以導(dǎo)致每個(gè)機(jī)器上的運(yùn)算量減少,并且數(shù)據(jù)分配時(shí),數(shù)據(jù)可以分配得更均勻。而在本文算法中,并行數(shù)目的增加,首先使得算法第一部分minUnit的生成速度加快,另一方面,第三部分的數(shù)據(jù)修復(fù)需要和分塊數(shù)目同樣大小的不動單元,而不動單元的數(shù)目在第二部分中會隨著遞歸層數(shù)的增加指數(shù)級上升,因此當(dāng)并行數(shù)目上升時(shí),只需在第二部分中增加很少的遞歸深度,便能夠滿足第三部分的不動單元的數(shù)目要求。顯然,計(jì)算時(shí)間并不會隨著并行數(shù)目的增加無限制地減少,甚至可以根據(jù)并行算法的時(shí)間來猜測,繼續(xù)增加并行數(shù)目,甚至有可能使得并行時(shí)間不降反升。主要原因在于忽略了一些基礎(chǔ)時(shí)間并不會完全被并行算法分擔(dān),例如算法中,主節(jié)點(diǎn)上的不動點(diǎn)的篩選;同時(shí),并行數(shù)目的增加會使得中間數(shù)據(jù)轉(zhuǎn)移過程中的數(shù)據(jù)量增加。 實(shí)驗(yàn)3研究均衡系數(shù)α對于算法時(shí)間和修復(fù)質(zhì)量的影響。實(shí)驗(yàn)3分成兩個(gè)部分,當(dāng)α從0增加至0.4時(shí),測試整個(gè)算法的時(shí)間;另一方面,由于實(shí)驗(yàn)前在數(shù)據(jù)中增加了一定的誤差,因此統(tǒng)計(jì)在修復(fù)算法結(jié)束以后,被修改的單元格數(shù)目,作為整個(gè)算法的修復(fù)質(zhì)量。數(shù)據(jù)共有20萬行,4個(gè)屬性,而有5%的數(shù)據(jù)被加入了誤差,因此理論上的錯誤單元格數(shù)量為40 000。但要注意的是,這里的錯誤數(shù)據(jù)可能被加入到了次序依賴的左側(cè)屬性列之中,而修復(fù)算法只是修改右側(cè)的數(shù)據(jù),使得修復(fù)后的結(jié)果滿足給出的次序依賴。圖6為實(shí)驗(yàn)3的結(jié)果。 (a) α與時(shí)間的關(guān)系 從圖6(a)中可以看到,隨著α的變大,整體的修復(fù)時(shí)間是變小的。當(dāng)α較小時(shí),α的增加會大幅縮減算法時(shí)間。這與算法中的相關(guān)分析是相符的。設(shè)計(jì)均衡系數(shù)α的初衷,便是使得在二分確定不動單元時(shí),盡可能使得不動單元靠近整體數(shù)據(jù)的中間,一方面能夠使得下一部的二分算法的兩個(gè)部分?jǐn)?shù)據(jù)規(guī)模相近,另一方面也是使得算法中,在每個(gè)子機(jī)器中使用Fix算法修復(fù)數(shù)據(jù)時(shí),各個(gè)子機(jī)器分得的數(shù)據(jù)規(guī)模相近。在分布式中,并行時(shí)間取決于自節(jié)點(diǎn)的并行時(shí)間最大值,因而合理分配各個(gè)子機(jī)器的數(shù)據(jù)量是非常重要的。在增加α的同時(shí),可能會造成數(shù)據(jù)修復(fù)質(zhì)量的下降,因?yàn)棣恋脑龃笥锌赡軙斐煽煽康牟粍訂卧捎谔幱跀?shù)據(jù)的兩端而直接被篩去,因此需要考慮α對于修復(fù)質(zhì)量的影響,探討α的增大是否會增加被修改的單元格數(shù)目。不過實(shí)際情況下,從圖6(b)中可以看出,α的增加并沒有使得更改的單元格數(shù)目大幅上升,而是幾乎保持不動。由此可以推斷,雖然α增加,但是每一次選擇的不動點(diǎn)單元依舊保持著比較高的質(zhì)量,沒有被污染。從這里可以體現(xiàn)出,算法對于數(shù)據(jù)誤差的高容納性,并且修復(fù)效果很好。另外注意到雖然有40 000個(gè)單元格被污染,但實(shí)際的單元格修改數(shù)目大致為47 016,超過最小修改值約17%。這樣的結(jié)果是可以被接受的,因?yàn)樗惴ㄊ轻槍τ覀?cè)屬性列進(jìn)行修改,而如果某條元組的次序依賴左側(cè)屬性值被誤差干擾,則可能會需要修改多條元組的右側(cè)屬性值來消去此誤差對于次序依賴的影響。 為了提高數(shù)據(jù)質(zhì)量中字典序次序依賴修復(fù)計(jì)算的效率,本文基于SPARK分布式計(jì)算框架提出新的字典序次序依賴分布式修復(fù)算法,通過數(shù)據(jù)和計(jì)算的有效分布,算法有效改善了大數(shù)據(jù)集上的運(yùn)行效率。文章通過實(shí)驗(yàn)驗(yàn)證了本文方法的有效性,并說明了算法參數(shù)的意義和效果。4 實(shí) 驗(yàn)
4.1 實(shí)驗(yàn)數(shù)據(jù)
4.2 時(shí)間與元組數(shù)目
4.3 時(shí)間與分塊數(shù)目的關(guān)系
4.4 均衡系數(shù)的影響
5 結(jié) 語