王寶軍 詹英
摘要: 對于許多應(yīng)用領(lǐng)域不斷產(chǎn)生的數(shù)據(jù)流,面向數(shù)據(jù)流聚集查詢的應(yīng)用最為廣泛。本文在構(gòu)造壓縮桶的基礎(chǔ)上,提出了基于時間維度壓縮數(shù)據(jù)流的算法,來動態(tài)地形成壓縮數(shù)據(jù)流,并進(jìn)一步給出了使用壓縮桶獲得數(shù)據(jù)流聚集查詢的數(shù)學(xué)方法。
關(guān)鍵詞: 數(shù)據(jù)流; 壓縮桶; 聚集查詢; 時間維度
中圖分類號:TP393文獻(xiàn)標(biāo)識碼:A 文章編號:1006-8228(2012)04-29-03
Aggregate compression algorithm for data stream
Wang Baojun, Zhan Ying
(Zhejiang Institute of Communications, Hangzhou, Zhejiang 311112, China)
Abstract: In many fields, data stream continues to grow in terms of generation speed. Aggregate query for data stream was most widely used. By constructing compression buckets, the authors provides in this paper a compression algorithm for data stream based on time dimension, in order to dynamically form compression data stream, and give mathematical method of aggregate query for data stream, by use of compression buckets.
Key words: data stream; compression buckets; aggregate query; time dimension
0 引言
數(shù)據(jù)流是隨著網(wǎng)絡(luò)的廣泛應(yīng)用而出現(xiàn)的一種新的數(shù)據(jù)形式。數(shù)據(jù)流聚集查詢是數(shù)據(jù)流管理與知識發(fā)現(xiàn)系統(tǒng)中一種重要的數(shù)據(jù)知識發(fā)現(xiàn)模型,但快速流動的流數(shù)據(jù)與有限的處理能力之間的矛盾使得流數(shù)據(jù)的聚集查詢分析比關(guān)系數(shù)據(jù)庫的聚集分析更困難。
目前國內(nèi)外已經(jīng)對數(shù)據(jù)流聚集查詢模式展開了研究。Dobra A等人研究利用隨機(jī)草圖技術(shù),提取數(shù)據(jù)流的輪廓,減少數(shù)據(jù)的處理量來加快數(shù)據(jù)處理速度,并提出了一種草圖分割技術(shù)來提高算法的性能[1]。Gilbert A C等人研究采用小波技術(shù)對數(shù)據(jù)流進(jìn)行壓縮,實現(xiàn)了近似聚集查詢[2]。Madden研究了傳感器網(wǎng)絡(luò)中的聚集查詢問題,重點是如何動態(tài)地建立路由樹,實現(xiàn)流水線聚集操作[3,4]。Ahnad Y提出了數(shù)據(jù)流查詢的分布式操作[5]。張冬冬等人提出了一種新的數(shù)據(jù)流傳輸方式,有效地減少網(wǎng)絡(luò)中分布式數(shù)據(jù)流的傳輸量[9]。傅鸝等人建立了基于數(shù)據(jù)流驅(qū)動的數(shù)據(jù)流連續(xù)查詢模型,設(shè)計并使用查詢算子在查詢鏈中的有序組合來構(gòu)造出各種復(fù)雜的連續(xù)查詢語句[7]。李建中等人提出利用多元線性回歸方法來預(yù)測具有線性關(guān)系的數(shù)據(jù)流的未來聚集值,但如果數(shù)據(jù)不具有線性關(guān)系,該模型誤差就會增大[10]。
以上的數(shù)據(jù)流聚集查詢相關(guān)算法采用近似聚集、壓縮數(shù)據(jù)流等技術(shù)來提高查詢速度。由于數(shù)據(jù)流的“流”性和隨機(jī)性,使得流量的變化具有突發(fā)性,然而,商業(yè)活動中,普遍要求能夠?qū)崟r地檢索面向數(shù)據(jù)流的聚集查詢結(jié)果,并獲得更高的準(zhǔn)確率。
1 數(shù)據(jù)流壓縮
1.1 相關(guān)問題描述
數(shù)據(jù)流是一個以數(shù)據(jù)到達(dá)時間為戳的數(shù)據(jù)序列。流數(shù)據(jù)的聚集查詢分為預(yù)定義查詢(Predefined Query)和即席查詢(Real-time Query)兩類。預(yù)定義查詢主要針對數(shù)據(jù)流后續(xù)到來的數(shù)據(jù)計算查詢結(jié)果;而即席查詢是針對數(shù)據(jù)流中流過的所有數(shù)據(jù)。數(shù)據(jù)流源源不斷地流入系統(tǒng),因此無法將所有數(shù)據(jù)流保存起來,為了獲得更為準(zhǔn)確的即時查詢結(jié)果,在聚集查詢中,需要對數(shù)據(jù)流進(jìn)行壓縮。由于數(shù)據(jù)流動態(tài)振蕩流動,面向數(shù)據(jù)流的數(shù)據(jù)流聚集查詢系統(tǒng)無法存儲所有流數(shù)據(jù),而用戶有查詢分析過去與未來流數(shù)據(jù)的需求,因此需要不斷地壓縮數(shù)據(jù)流,來滿足用戶需求。壓縮后的數(shù)據(jù)流結(jié)構(gòu)應(yīng)該是簡單的,方便為用戶提供各類流數(shù)據(jù)聚集查詢,并能夠最大程度地反映原始流數(shù)據(jù)。壓縮后的數(shù)據(jù)流結(jié)構(gòu)是對壓縮后的數(shù)據(jù)流的靜態(tài)特征的描述,它描述數(shù)據(jù)的內(nèi)容和流數(shù)據(jù)之間的相互關(guān)系。
由于數(shù)據(jù)流連續(xù)無限地流動,數(shù)據(jù)流具有時間特征,因此可以在時間維度上壓縮數(shù)據(jù)流。本文采用基于對數(shù)尺度的時間傾斜框架模型[8]來壓縮數(shù)據(jù)流。面向數(shù)據(jù)流壓縮算法以增量的方式對壓縮數(shù)據(jù)流進(jìn)行更新,從而提高數(shù)據(jù)流的壓縮速度,滿足數(shù)據(jù)流聚集查詢的實時性要求。用戶會根據(jù)需求向系統(tǒng)提出多種聚集查詢,這要求壓縮數(shù)據(jù)流盡可能地反映原數(shù)據(jù)流的信息。隨著時間的流逝,流過的流數(shù)據(jù)被不斷地壓縮,歷史流數(shù)據(jù)被不斷地拋棄。
1.2 相關(guān)定義
定義1. 設(shè)PT為時間分區(qū)長度。構(gòu)造壓縮桶Buckcets(BuckcetsID=0…n),壓縮桶有三個抽屜drawer(drawerID=0…2),每個抽屜存放流數(shù)據(jù)的時間長度為2 BuckcetsID×PT。壓縮桶的結(jié)構(gòu)如圖1所示。其中每個桶的2號抽屜是臨時存儲單元。如果0號抽屜是空的,則同一個桶的1號抽屜也空。
設(shè)i(i=0…n)為壓縮桶的編號, i號桶中的抽屜存儲流數(shù)據(jù)的時間長度為2i×PT。每個壓縮桶的第0號與第1號抽屜存放流數(shù)據(jù),2號抽屜是臨時存儲空間,只有當(dāng)這個桶中的第0號與第1號抽屜非空,此時只能將新流入的流數(shù)據(jù)臨時存放到2號抽屜,系統(tǒng)合并此桶的第0號與第1號抽屜,并推入下一桶后,新流入到2號抽屜的流數(shù)據(jù)被轉(zhuǎn)移到同一桶的0號抽屜。例如,第0號桶流入第3個PT時間長度的流數(shù)據(jù),而第0號桶的第0號與第1號抽屜已經(jīng)分別存儲了第1個和第2個PT時間長度,系統(tǒng)壓縮第0號桶的第0號與第1號抽屜,并將流數(shù)據(jù)推入第1號桶的第0號抽屜后,第3個PT時間長度的流數(shù)據(jù)才可以流入第0號桶的第0號抽屜。也就是說,桶號為i的流數(shù)據(jù)來源于桶號為i-1的桶,系統(tǒng)壓縮第i-1號桶的第0號與第1號抽屜,并將流數(shù)據(jù)推入第i號桶的第0號或第1號抽屜。壓縮桶間的數(shù)據(jù)壓縮與流動示意圖如圖2所示。
圖1壓縮桶的結(jié)構(gòu)
圖2壓縮桶間的數(shù)據(jù)壓縮與流動示意圖
引理保存流數(shù)據(jù)的最大時間長度為LongTime,MaxBCount為保存LongTime時長的流數(shù)據(jù)所需壓縮桶的數(shù)量。則
⑴
證明:設(shè)m個桶最多可以存儲流數(shù)據(jù)的時間長度為MaxT(m),則
MaxT=(2×20+2×21+…+2×2m) ×PT
所以MaxT(m)=(2m+1-2)×PT
設(shè)m-1個桶最多可以存儲流數(shù)據(jù)的時間長度為MaxT(m-1),則
當(dāng)時間長度LongTime滿足:
MaxT(m-1)<LongTime≤MaxT(m)
則存儲時間長度LongTime的流數(shù)據(jù)至少需要m個桶。所以:
證畢。
1.3 數(shù)據(jù)流壓縮算法
以商業(yè)零售實際業(yè)務(wù)數(shù)據(jù)流為例,本文將探索針對數(shù)據(jù)流的聚集查詢與壓縮方法。商業(yè)零售數(shù)據(jù)流結(jié)構(gòu)如下:sale(ProductID,OrderQty),sale是超市商業(yè)零售數(shù)據(jù)流,ProductID表示產(chǎn)品編號,OrderQty表示訂貨量。用戶根據(jù)需求提交各類查詢,并請求實時獲得各類查詢結(jié)果。例如,系統(tǒng)根據(jù)用戶提交的產(chǎn)品號 ProductID,選擇相關(guān)產(chǎn)品進(jìn)行壓縮。
定義初始數(shù)據(jù)流結(jié)構(gòu):
Datasourse(timestamp;productID;orderqty),timestamp記錄了流數(shù)
據(jù)到達(dá)的時間點。
定義壓縮后數(shù)據(jù)流的數(shù)據(jù)結(jié)構(gòu):
Compresssourse(starttime; productID; maxorderqty;minorderqty;sumorderqty; countorderqty),starttime表示壓縮的初
始時間; maxorderqty表示訂貨量的最大值;minorderqty表示訂
貨量的最小值;sumorderqty表示訂貨量的總和; countorderqty表
示訂貨次數(shù)。
算法1:數(shù)據(jù)流壓縮算法
輸入:初始數(shù)據(jù)流。
輸出:經(jīng)過壓縮后的數(shù)據(jù)流存儲在桶中,每個抽屜存儲壓縮后的數(shù)據(jù)流。
定義桶的數(shù)據(jù)結(jié)構(gòu):
public struct buckets
{public compresssourse drawer0;
public compresssourse drawer1;
public compresssourse drawer2;}
根據(jù)需存儲的最大時間長度,計算需要的桶數(shù)MAXBcount;
定義桶DataS:
buckets[] DataS = new buckets[MAXBcount];
初始化桶中的所有抽屜;定義記錄時間長度的變量feng;定義時間分區(qū)PT;
While(true)
{根據(jù)用戶提交查詢的產(chǎn)品號ProductID 獲取原始數(shù)據(jù)流;
獲得產(chǎn)生數(shù)據(jù)流的當(dāng)前時間;
if(接收的是第一個流數(shù)據(jù))
{壓縮后直接推入0號桶2號抽屜,它的starttime為被推入流數(shù)據(jù)的
timestamp。接著進(jìn)入下一循環(huán)等待下一個流數(shù)據(jù);}
計算新流入的流數(shù)據(jù)的timestamp與0號桶2號抽屜的starttime相隔時間feng:
if (是同一個時間分區(qū)feng < PT)
{壓縮同一時間分區(qū)內(nèi)的數(shù)據(jù)到0號桶2號抽屜;
回到循環(huán)開頭,繼續(xù)讀下一個數(shù)據(jù);
continue;}
else
{ 記錄當(dāng)前桶號碼;
while (DBcount < MAXBcount)
{if (桶0號抽屜有空)
{將桶2號抽屜的數(shù)據(jù)移到桶0號抽屜;
break;}
else
{if (桶1號抽屜有空)
{將桶2號抽屜的數(shù)據(jù)移到桶1號抽屜;
break; }
else
{if (不是最后一桶)
{將該桶的0號與1號抽屜合并后放入下一桶中的2號抽屜;
該桶的0號與1號抽屜變空;}
else
{丟棄該桶的0號與1號抽屜;}
合并后,該桶0號抽屜空出來,放入該桶2號抽屜的流數(shù)據(jù); }}}
if (0號桶2號抽屜空))
{將新讀入的數(shù)據(jù)放入0桶2號抽屜;
重新設(shè)置starttime;}
else {break;}}}
2 獲得壓縮桶狀態(tài)的數(shù)學(xué)方法
當(dāng)用戶向系統(tǒng)提出面向數(shù)據(jù)流的查詢請求時,系統(tǒng)首先判斷流數(shù)據(jù)被壓縮到哪些桶中,而壓縮流數(shù)據(jù)存儲了最大值、總和等聚集值,使得用戶獲得聚集值變得非常方便。
在壓縮過程的任意時刻,用戶均可能提出獲得流數(shù)據(jù)的聚集值,這要求系統(tǒng)能夠迅速判斷各個桶的狀態(tài),也就是每個桶中的0號抽屜或1號抽屜是否存儲了壓縮數(shù)據(jù)。
假設(shè)j為最后流入桶中的時間分區(qū)流數(shù)據(jù),求每個桶中含有數(shù)據(jù)的抽屜數(shù)。存儲第j個時間分區(qū),需要BCount個桶。則:
,⑵
如果,BCount大于MaxBCount,則從MaxBCount+1到BCount號桶的流數(shù)據(jù)被丟棄。所以,
,j∈N+。
則,
,⑶
其中ai的取值僅為0或1,表示第i個桶中有ai+1個抽屜有流數(shù)據(jù)。ai=0表示0號抽屜存儲了壓縮流數(shù)據(jù),ai=1表示0號與1號抽屜存儲了壓縮流數(shù)據(jù)。
例如j=33,表示持續(xù)流入數(shù)據(jù)流的時間長度為33×PT個時間長度。根據(jù)公式⑵,此時需要的桶數(shù)為5。根據(jù)公式⑶,得到33-25+1=2。則2=0×20+1×21+0×21+0×21+0×21,由此,我們可以得到壓縮桶的狀態(tài)為0號桶、2號桶、3號桶、4號桶的0號抽屜存儲了壓縮數(shù)據(jù),1號桶的0號與1號抽屜存儲了壓縮數(shù)據(jù)。
3 結(jié)束語
本文提出了在時間維度上壓縮數(shù)據(jù)流的方法:不斷流入壓縮桶的流數(shù)據(jù)被不斷地以2為底的對數(shù)尺度進(jìn)行壓縮。實驗表明,壓縮桶結(jié)構(gòu)在滿足了壓縮數(shù)據(jù)的存儲需求的同時,大大減少了存儲空間,桶中的壓縮數(shù)據(jù)能夠隨著時間不斷地更新,基于時間傾斜的數(shù)據(jù)流壓縮算法能夠提高數(shù)據(jù)流的壓縮速度。能夠滿足數(shù)據(jù)流聚集查詢的實時性要求,也能夠提高數(shù)據(jù)流動態(tài)聚集查詢的效率及靈活性。
參考文獻(xiàn):
[1] Dobra A,Garofalakis M,Gehrke J,et a1.Processing Complex Aggregate Queries over Data Streams[C].Proceedings of the 2002ACM SIGMOD International Conference on Management of Data,M acIison.W isconsin.2002.
[2] Gilbert A C,Kotidis M uthukrishnan S M ,et a1.Surfing Wavelets on Streams: One—pass Summaries for Approximate Aggregate Queries[C] .Proceedings of the 27th International conference on Very Large Data Bases.2001
[3] Madden S R,Franklin M J,Hellerstein J M ,et a1.TAG :A Tiny Aggregation Service for Ad—hoc Sensor Networks[C] .Proc.of the 5thSymp.on Operating Systems Design and Implementation,Boston,USA 2002.
[4] Madden S R.Szewczyk R.Franklin M J.et a1.Supporting Aggregate Queries Over Ad—hoc Wireless Sensor Networks[C].Proceedings of the Workshop on Mobile Computing and Systems Applications.Los Alamitos:IEEE Computer Press.2002.
[5] Ahnad Y,Berg B,Cetintemel U,et a1.Distributed operation in the borealis stream processing engine[C].Proc of ACM SIGMOD Conference.Baltimore:[s.n.],2005:882~884
[6] 詹英,吳春明,王寶軍.一種與緩沖區(qū)緊耦合的環(huán)形循環(huán)滑動窗口的數(shù)據(jù)流抽取算法[J].電子學(xué)報,2011.39(4):2262~2267
[7] 傅鸝,魯先志,蔡斌.一種基于數(shù)據(jù)流驅(qū)動的數(shù)據(jù)流連續(xù)查詢模型[J].重慶工學(xué)院學(xué)報(自然科學(xué)),2008.22(10)
[8] Jiawei Han,Micheline Kamber.Data Mining Concepts and Techniques[M].China Machine Press.
[9] 張冬冬,李建中,王偉平,等.分布式復(fù)式數(shù)據(jù)流的處理[J].計算機(jī)研究與發(fā)展,2004.41(10):1780~1785
[10] Li Jian -zhong,Guo Long-jiang,Zhang Dong-dong,et a1.Processing algorithms or predictive aggregate queries over data streams[J].Journal of Software,2005.16(7):1251~1261