劉婧妍,廖湖聲,高紅雨
(北京工業(yè)大學(xué)信息學(xué)部計算機學(xué)院,北京 100124)
在當(dāng)前的大數(shù)據(jù)背景下,很多領(lǐng)域需要對實時快速到來的數(shù)據(jù)(即流數(shù)據(jù))進行分析。流數(shù)據(jù)往往單體價值較小,數(shù)據(jù)價值隨時間后移而降低。傳統(tǒng)對大數(shù)據(jù)的批量計算模式已很難滿足對流數(shù)據(jù)的實時性查詢需求。
復(fù)雜事件處理(Complex Event Processing, CEP)[1]是對流數(shù)據(jù)進行即時處理的一種技術(shù)。復(fù)雜事件處理模型使用事件模型驅(qū)動。在處理數(shù)據(jù)時,使用事件間的因果關(guān)系、模式匹配等對事件進行過濾,對事件做聚合映射,從而篩選出用戶感興趣的信息。復(fù)雜事件處理廣泛應(yīng)用于醫(yī)療[2-3]、物流[4-5]、傳感器網(wǎng)絡(luò)[6-8]等多個領(lǐng)域。
目前大多數(shù)的復(fù)雜事件處理語言都擁有基礎(chǔ)算子[9](選擇、映射、邏輯算子、序列化等),可供用戶實現(xiàn)相應(yīng)的復(fù)雜事件查詢功能。CEL語言是復(fù)雜事件處理系統(tǒng)Cayuga[10-11]上運行的語言。CEL語言在傳統(tǒng)數(shù)據(jù)庫聲明性語言的基礎(chǔ)上擴展序列、迭代算子,以應(yīng)對流數(shù)據(jù)的處理需求。CEL語言不支持窗口。EPL語言及其支持引擎Esper[12]支持對于XML類型數(shù)據(jù)的操作。它包含SQL中的所有算子,同時擁有連接、過濾、聚合算子。EPL語言支持自定義函數(shù)操作。但是其支持的函數(shù)只能處理同一類的所有數(shù)據(jù),比如滿足篩選條件的同一屬性數(shù)據(jù),而不能對其中的單個屬性進行操作。以上的復(fù)雜事件處理語言都是針對流數(shù)據(jù)進行批量處理,無法針對數(shù)據(jù)進行細粒度的篩選和操作。XSeq語言及其系統(tǒng)[13-15]主要針對類型更加復(fù)雜的半結(jié)構(gòu)化數(shù)據(jù)(XML流數(shù)據(jù))進行查詢。該語言可以處理基本的XPath查詢結(jié)構(gòu),并添加了順序約束和克林閉包。XSeq語言在基礎(chǔ)的類數(shù)據(jù)庫查詢的同時,增加了部分內(nèi)置函數(shù)以提供聚合的功能。例如計數(shù)、求均值、平均值等。XSeq對于XML類型數(shù)據(jù)的查詢針對性較強,但其僅支持對于批量的XML元素做統(tǒng)一的查詢,同時其描述相對復(fù)雜。CEStream是一種支持分布式流數(shù)據(jù)復(fù)雜事件處理的語言[16-17]。該語言主要針對XML數(shù)據(jù)進行檢測,支持基礎(chǔ)的SQL算子,以及常見的流數(shù)據(jù)處理算子連接、過濾、聚合等,并增加了順序約束、克林閉包等操作。該語言提供了正規(guī)樹模式匹配功能,并且支持結(jié)構(gòu)連接,同時可以對多個事件源進行組合處理,事件處理能力較強。以上的復(fù)雜事件處理語言都難以滿足對流數(shù)據(jù)做靈活的細粒度查詢功能。
為滿足復(fù)雜事件處理的細粒度處理需求,本文設(shè)計一種CEStream語言的自定義函數(shù)功能。通過定義函數(shù)形式參數(shù)的流模式匹配功能,實現(xiàn)對流數(shù)據(jù)標(biāo)簽的逐個處理。同時,在自定義函數(shù)中給用戶提供對原有語句封裝、參數(shù)化變量,提高語言的可用性。本文主要工作為:
1)為復(fù)雜事件處理語言CEStream擴展用戶自定義函數(shù)(User-Defined Function, UDF)功能。擴展的UDF主要分為2類:支持形參模式匹配的細粒度處理UDF和參數(shù)化變量的UDF。
2)提出一種流模式匹配方法,使用戶自定義函數(shù)支持對流數(shù)據(jù)的細粒度檢測。
3)設(shè)計一種參數(shù)化變量的用戶自定義函數(shù)。該函數(shù)給用戶提供對原有查詢語句的封裝功能,增加語言的復(fù)用性和靈活性。
4)設(shè)計并實現(xiàn)CEStream語言的用戶自定義函數(shù)運行系統(tǒng)。實驗表明在完成相同功能查詢時使用自定義函數(shù),在提高語言可用性的同時,吞吐量并未下降;支持細粒度檢測功能的自定義函數(shù)加強了CEStream復(fù)雜事件處理語言的檢測功能。
原有的復(fù)雜事件處理語言CEStream主要處理語句有構(gòu)造流數(shù)據(jù)、構(gòu)造模式這2種語句。在用戶使用該語言時,每當(dāng)需要改變語句中的變量都需要對整個語句完全重寫,用戶體驗不友好,同時容易造成程序代碼冗余。為解決這種問題,可以設(shè)計用戶自定義函數(shù)將原有語句放在函數(shù)內(nèi)并將變量參數(shù)化,使用戶在需要改動變量值時改變調(diào)用函數(shù)的傳入?yún)?shù)即可。同時,自定義函數(shù)的設(shè)計帶給該語言初步模塊化的功能,使用戶在求解問題時可以將問題分解為幾個模塊進行分析。
由于各行業(yè)對于復(fù)雜事件處理語言處理數(shù)據(jù)的能力要求越來越高,原有針對流數(shù)據(jù)中每個事件進行相同篩選、批量處理的操作已經(jīng)不能滿足使用者的需求。例如,股市、核電站警報等都需要復(fù)雜事件處理語言能夠?qū)?shù)據(jù)進行一種上升趨勢的篩選和處理。具體的查詢案例如表1所示。
表1 查詢案例
對于表1案例中這種上升趨勢的判定,實際是前2個事件中的數(shù)據(jù)進行比較,根據(jù)不同的比較結(jié)果,進入不同的分支。這是對于復(fù)雜事件所進行的細粒度處理,用CEStream原有語句對每一個事件都進行相同篩選的操作是無法完成的。其他復(fù)雜事件處理語言中,只有Esper語言[12]支持這種細粒度的數(shù)據(jù)操作。但Esper語言對于這類操作的語言寫法相對復(fù)雜,對于遞增的n個連續(xù)事件,必須有n條語句與之對應(yīng)。因此有必要設(shè)計一種令CEStream語言支持對元素的逐個處理,可以擴展一種支持形式參數(shù)模式匹配的自定義函數(shù)。該函數(shù)的形式參數(shù)中可以寫一種只匹配一個到幾個XML元素標(biāo)簽,同時可以嵌套匹配深層XML元素標(biāo)簽的模式匹配表達式。通過對XML數(shù)據(jù)流的頭部進行匹配,達到對流數(shù)據(jù)進行細粒度處理的效果。同時,可以通過遞歸調(diào)用函數(shù)的形式,實現(xiàn)對流數(shù)據(jù)的整個處理過程。
在復(fù)雜事件處理語言CEStream中擴展用戶自定義函數(shù)功能,給出一種通過UDF實現(xiàn)細粒度處理流數(shù)據(jù)的方法并增加該語言的靈活性。UDF提供流的參數(shù)模式匹配功能,使CEStream語言能夠支持對于一個到幾個XML標(biāo)簽流的匹配及操作,并通過對于XML標(biāo)簽內(nèi)容的篩選進入不同的分支,提供細粒度檢測數(shù)據(jù)的方法;同時實現(xiàn)對CEStream原有創(chuàng)建模式、創(chuàng)建流語句的封裝,使原有變量參數(shù)化。
2.1.1 支持形式參數(shù)模式匹配的細粒度處理UDF
CEStream語言的用戶自定義函數(shù)為滿足對XML數(shù)據(jù)段的細粒度查找操作,使用函數(shù)重載的機制對每一個XML標(biāo)簽組做出篩選,并能夠根據(jù)其實際匹配的形式參數(shù)類型,實現(xiàn)對不同標(biāo)簽組,進行不同處理的操作。
表2給出一個監(jiān)控核電站核心溫度的自定義函數(shù)案例。該案例所對應(yīng)的查詢需求如表1所示。為監(jiān)控核電站溫度是否過高,可通過定義函數(shù)tempUp、 tempUp1進行處理。如表2中給出的函數(shù)定義以及形式參數(shù)注釋所示,其中tempUp為入口函數(shù),判斷第一個eve事件的溫度是否大于閾值$v,同時前2個eve事件是否后一個大于前一個。tempUp1函數(shù)為一組重載函數(shù),針對后續(xù)的邏輯進行判斷。
表2 監(jiān)控核心溫度函數(shù)案例
函數(shù)tempUp有2個形式參數(shù),分別為流模式匹配表達式eve(t($a)$b) eve(t($c)$d) $e與閾值 $v。調(diào)用該函數(shù)時,傳入的第一個流數(shù)據(jù)參數(shù)必須與流模式匹配表達式匹配。
重載函數(shù)tempUp1有2個同名函數(shù),第一個函數(shù)判斷結(jié)尾處是否符合最后一個事件的溫度大于第一個事件溫度的1.15倍;第二個函數(shù)判斷中間狀態(tài)是否一直符合遞增關(guān)系。這2個函數(shù)的形式參數(shù)列表有部分不同,下面逐個說明第一個函數(shù)定義的形式參數(shù)含義:常量匹配表達式0,閾值$v,首個事件溫度$t1st,已得到的流數(shù)據(jù)頭部$h,流模式匹配表達式eve(t($a)$b)$e。當(dāng)調(diào)用函數(shù)tempUp1時,根據(jù)傳入?yún)?shù)成功匹配的形參,進行調(diào)用。
用戶可以通過自定義函數(shù)將一個或多個功能封裝起來,對外提供調(diào)用接口。在為復(fù)雜事件處理語言CEStream增加用戶自定義函數(shù)功能時,可以考慮將一些固定的模式匹配語句、創(chuàng)建流語句放在函數(shù)內(nèi)。也可將其中一些調(diào)用函數(shù)時經(jīng)常更改的設(shè)定值作為函數(shù)的參數(shù)傳入,設(shè)計一種參數(shù)化模式的函數(shù)。
例如復(fù)雜事件流處理中的典型應(yīng)用火情檢測,這種參數(shù)化模式的函數(shù),可使得處理操作更便捷,對火情檢測的細微調(diào)整等更加方便。表3給出了通常情況下的檢測案例:30 s內(nèi)出現(xiàn)連續(xù)3次溫度檢測值大于80 ℃,則發(fā)出一條報警消息。但由于環(huán)境不同,檢測設(shè)定的溫度閾值可能改變。使用表3所示的函數(shù)調(diào)用改變調(diào)用時傳入的參數(shù),即可改變溫度閾值。
表3 火情檢測案例
參數(shù)化模式的用戶自定義函數(shù),使CEStream語言更進一步模塊化。它在實現(xiàn)對正規(guī)樹模式匹配、事件流模式封裝的同時,使用參數(shù)化模式的方式,使得在調(diào)用該函數(shù)時對函數(shù)內(nèi)部的模式匹配標(biāo)簽以及創(chuàng)建流的時間窗口大小等變量可以進行調(diào)整。使用CEStream語言的用戶可以實現(xiàn)自定義函數(shù),并在調(diào)用函數(shù)時傳入不同參數(shù),以達到不同的篩選效果。
以下是復(fù)雜事件處理語言CEStream擴展的UDF的核心語法以及相關(guān)內(nèi)置函數(shù)設(shè)置的描述。
1)CEStream語句語法。
正規(guī)樹模式定義語法與事件流定義語法如表4與表5所示。
表4 正規(guī)樹模式定義語法
表5 事件流定義語法
在擴展CEStream語言時,將原有的pd(正規(guī)樹模式匹配)、sd(事件流模式)以及新增的賦值語句統(tǒng)稱為語句(statement)作為用戶可以書寫的語句,并擴展變量(ID)、常量(PRIMITIVE_TYPE_CONST)、函數(shù)調(diào)用(func_call)、四則運算表達式、條件判斷表達式這5種表達式。其中四則運算表達式、條件判斷表達式不再給出詳細的文法規(guī)定。下面是CEStream語句語法。
statement_list→statement_list ‘,’ statement | empty
statement→pd | sd | ID=expression ‘;’
expression→ID | CONST | func_call
| arithmetic_expr | if_expr
|expression‘,’expression
func_call→ID ‘(’ expression [‘,’ expression] ‘)’
2)UDF定義語法。
fd→define function ID ‘(’ paramlist‘)’ ‘{’ func_body ‘}’
paramlist→paramlist ‘,’ param | param
param→CONST | arithmetic_expr
| pattern_match
pattern_match→ID | NULL | TAG(pattern_match) pattern_match
func_body→[ statement ] return expression
用戶自定義函數(shù)定義(fd)中包含函數(shù)名、形式參數(shù)以及函數(shù)體這3部分。其中形式參數(shù)(param)為基礎(chǔ)類型常量(CONST)或流模式匹配表達式(pattern_match)或四則運算表達式。函數(shù)體(func_body)包括零個或多個CEStream語句(statement)以及一個返回值表達式。
當(dāng)調(diào)用自定義函數(shù)時,通過傳入的實際參數(shù)與函數(shù)定義中形式參數(shù)相應(yīng)位置的流模式匹配表達式是否匹配決定是否調(diào)用該函數(shù)。對于函數(shù)重載形式的同名函數(shù),根據(jù)順序依次對函數(shù)調(diào)用語句的實參和函數(shù)定義的形參進行匹配,實參和形參全部匹配成功則確定調(diào)用語句調(diào)用的是該函數(shù)。
3)UDF細粒度查詢內(nèi)置函數(shù)描述。
對于流數(shù)據(jù)的處理,可以使用內(nèi)置函數(shù)。內(nèi)置函數(shù)操作流數(shù)據(jù)共分為4種:head、 tail、 isnull、 cons。這4種內(nèi)置函數(shù)操作,分別針對流數(shù)據(jù)取頭、取尾、判斷變量是否為空、連接2個數(shù)據(jù)段。
可以作為內(nèi)置函數(shù)參數(shù)的語法結(jié)構(gòu)為:符合內(nèi)置函數(shù)輸入?yún)?shù)類型約定的用戶自定義函數(shù)調(diào)用語句,內(nèi)置函數(shù),流模式匹配表達式,變量。
系統(tǒng)設(shè)計部分主要對于自定義函數(shù)的處理進行說明,本系統(tǒng)分為編譯模塊、查詢預(yù)處理模塊、數(shù)據(jù)初步處理模塊、集群這4部分。其中編譯模塊增加對語句、自定義函數(shù)定義、調(diào)用的編譯功能;查詢預(yù)處理模塊區(qū)分細粒度處理UDF和參數(shù)化模式的UDF;數(shù)據(jù)預(yù)處理模塊分別對2種函數(shù)調(diào)用做不同處理,實現(xiàn)新增的細粒度數(shù)據(jù)處理功能;集群對數(shù)據(jù)進行后續(xù)篩選工作,并輸出最后的查詢結(jié)果。
圖1為該系統(tǒng)的活動圖,說明了各個模塊的工作流程和模塊之間的關(guān)系。
圖1 自定義函數(shù)處理系統(tǒng)活動圖
1)編譯模塊是系統(tǒng)最初接收用戶輸入的CEP語言的模塊,該模塊做下列操作:
①對CEP語言進行語法分析及語義分析,生成查詢計劃。
②將查詢計劃發(fā)送給查詢預(yù)處理模塊。
2)查詢預(yù)處理模塊從編譯模塊接收查詢計劃,進行下列操作:
①接收查詢計劃,對其進行預(yù)處理。區(qū)分2種不同的自定義函數(shù)。對包含原有語句的參數(shù)化模式函數(shù)做預(yù)處理,對每一個數(shù)據(jù)源生成一個單源查詢計劃。對支持形參模式匹配的細粒度處理函數(shù),將相互之間有調(diào)用關(guān)系的函數(shù)關(guān)聯(lián),合并生成細粒度函數(shù)查詢計劃。
②根據(jù)函數(shù)調(diào)用,分析變量間的綁定關(guān)系,并將其與查詢計劃發(fā)送到集群中做初始化操作。
③根據(jù)調(diào)用函數(shù)需要的數(shù)據(jù)源信息,將數(shù)據(jù)源和對應(yīng)的數(shù)據(jù)處理模塊連接,并發(fā)送查詢計劃給其對應(yīng)的數(shù)據(jù)處理模塊。
3)數(shù)據(jù)初步處理模塊接收查詢預(yù)處理模塊生成的查詢計劃,做以下操作:
①對單源查詢計劃生成模式樹,對實時到來的XML數(shù)據(jù)流進行正規(guī)樹模式匹配并不斷輪詢。
②對細粒度函數(shù)查詢計劃,根據(jù)函數(shù)調(diào)用形參中的流模式匹配表達式,以及函數(shù)內(nèi)的語句對XML流數(shù)據(jù)進行篩選。該部分采用惰性求值的方法,對需要值的部分求值并輸出,后續(xù)的流數(shù)據(jù)以閉包形式保存,以解決對與流數(shù)據(jù)遞歸調(diào)用的問題。
4)集群使用接收到的查詢計劃做初始化操作,并在得到數(shù)據(jù)流后對其進行匹配、篩選等操作,輸出最終的查詢結(jié)果。
本章通過實驗分析用戶自定義函數(shù)的性能以及功能擴展。實驗主要從自定義函數(shù)中使用原有語句時對查詢性能的影響,以及其增加的細粒度數(shù)據(jù)處理功能這2個方面進行。主要分析:1)在完成相同功能的查詢時,由自定義函數(shù)封裝的語言和未封裝的語言之間的性能差異;2)用戶自定義函數(shù)對原有復(fù)雜事件處理查詢語言的功能擴展。本實驗的數(shù)據(jù)源為網(wǎng)站服務(wù)器中各種XML格式的記錄日志。本章所有實驗的軟件環(huán)境為JDK1.8,硬件環(huán)境為Intel Xeon E5-1607 3.0 GHz、 6 GB內(nèi)存的PC機。
表6是本實驗的測試案例。案例Q1.1、 Q1.2是使用原有查詢語句進行查詢的案例,作為對照組。案例Q2.1~Q2.4使用用戶自定義函數(shù)進行查詢,其中Q2.1的查詢效果和案例Q1.1相同,Q2.2~Q2.4的查詢效果和案例Q1.2相同。實驗通過使用用戶自定義函數(shù)前后的性能對比,判斷調(diào)用自定義函數(shù)對查詢性能的影響。
表6 測試案例
如圖2與圖3所示,單個數(shù)據(jù)源輸入案例Q2.1和原有語句的對照組Q1.1在進行檢測時,吞吐量均為24000(事件數(shù)/s)左右;多個數(shù)據(jù)源輸入的案例中,使用UDF的Q2.2~Q2.4和其對照組Q1.2進行檢測,吞吐量為19500(事件數(shù)/s)左右。實驗表明,對于相同的查詢需求,使用自定義函數(shù)來實現(xiàn),在增加程序的可擴展性的同時,對查詢效率的影響很小。
圖2 單源案例吞吐量對比
圖3 多源案例吞吐量對比
另外,為了測試用戶自定義函數(shù)為CEStream復(fù)雜事件處理語言增加的細粒度查詢能力,在各個需要復(fù)雜事件處理的行業(yè)中,篩選出100個需要對數(shù)據(jù)進行細粒度處理的場景進行功能性測試。這些場景包括股票分析、環(huán)境溫度檢測、故障檢測等各個行業(yè)。實驗證明用戶自定義函數(shù)可以很好地支持這些細粒度查詢要求。
本文提出了一種為復(fù)雜事件處理語言增加用戶自定義函數(shù)功能的方案,并給出了自定義函數(shù)部分的語法設(shè)計及其運行系統(tǒng)。用戶自定義函數(shù)使復(fù)雜事件處理語言增加了細粒度處理流數(shù)據(jù)的能力,并使用戶能夠通過函數(shù)對查詢功能進行封裝。