武志學
1.成都五舟漢云科技有限公司,成都 611731; 2.成都信息工程大學 信息安全工程學院,成都 610225)(*通信作者電子郵箱zhixue.wu@gmail.com)
基于Spark Streaming的實時能耗分項計量系統(tǒng)
武志學1,2*
1.成都五舟漢云科技有限公司,成都 611731; 2.成都信息工程大學 信息安全工程學院,成都 610225)(*通信作者電子郵箱zhixue.wu@gmail.com)
能耗分項計量能夠準確、及時、有效地發(fā)現能源使用問題,形成和實現最有效的節(jié)能措施。能耗分項計量系統(tǒng)需要對各項能源使用量在不同粒度上進行統(tǒng)計,既有實時性的需求,又需要涉及到聚合、去重、連接等較為復雜的統(tǒng)計需求。由于數據產生快、實時性強、數據量大,所以很難統(tǒng)一采集并入庫存儲后再作處理,這便導致傳統(tǒng)的數據處理架構不能滿足需求。為此,提出基于Spark Streaming大數據流式技術構建一個實時能耗分項計量系統(tǒng),對實時能耗分項計量的系統(tǒng)架構和內部結構進行了詳細介紹,并通過實驗數據分析了系統(tǒng)的實時數據處理能力。與傳統(tǒng)架構不同,實時能耗分項計量系統(tǒng)在數據流動的過程中實時地進行捕捉和處理,一方面把捕捉到的異常信息及時報警到前端,同時把分類分項統(tǒng)計處理的結果保存到數據庫,以便進行離線分析和數據挖掘,能有效地解決上述數據處理過程中遇到的問題。
流式計算;能耗分項計量;Spark Streaming;Apache Kafka;大數據
伴隨著我國城市化進程的加快,大型公共建筑節(jié)能工作勢在必行。如何達到既滿足使用及舒適度的需求,又能科學、合理地節(jié)能降耗已經是全社會所要思考的問題。在大力推廣節(jié)能減排的階段,要達到最快、最明顯的節(jié)能效果,不單是采用設備節(jié)能手段,更需要使用分項計量準確、及時、有效地發(fā)現能源使用問題,形成和實現最有效的節(jié)能措施。能耗分項計量是指對建筑的水、電、燃氣、集中供熱、集中供冷等各種能耗進行監(jiān)測,從而得出建筑物的總能耗量和不同能源種類、不同功能系統(tǒng)的能耗量[1]。要實現分項計量,必須進行數據采集、數據傳輸、數據存儲和數據分析等。所以,能耗分項計量是一個典型的流式大數據系統(tǒng),具有數據量大、數據產生速度快、數據結構復雜等特點。
一般情況下,能耗分項計量包括空調系統(tǒng)、電梯系統(tǒng)、給排水系統(tǒng)、通風系統(tǒng)、照明系統(tǒng)及辦公設備系統(tǒng)等。對于用能密度高、單體設備耗能大的集中空調系統(tǒng),應進行更細致的計量,包括:冷凍主機用電量、冷凍水泵用電量、冷卻水泵用電量、冷卻塔風機用電量、空調箱和新風機用電量等。所以進行能耗分項計量時,需要對各項能源使用量在不同粒度上對不同數據進行統(tǒng)計,既有實時性的需求,又需要涉及到聚合、去
圖1 基于傳統(tǒng)數據處理模式的分項計量系統(tǒng)
重、連接等較為復雜的統(tǒng)計需求。由于數據產生快、實時性強、數據量大,如果采取傳統(tǒng)的數據處理架構,首先對采集到的數據入庫存儲,然后再作處理,很難滿足分項計量的需求。特別是為了找到能耗使用規(guī)律提出有效節(jié)能措施,不但需要部署大量能耗采集儀表,還需要進行更為復雜的數據處理,從而引起在單位時間內要處理的實時數據量和計算工作量同時大幅上升,這便導致傳統(tǒng)的數據處理架構不能滿足需要。為了解決這個問題,本文通過使用Apache Kafka和Spark Streaming模塊構建了一個實時流式數據處理系統(tǒng)來進行能耗分項計量。與傳統(tǒng)架構不同,實時流式數據處理系統(tǒng)在數據流動的過程中實時地進行捕捉和處理,并根據業(yè)務需求對數據進行計算分析,一方面把捕捉到的異常信息及時報警到前端,同時把分類分項統(tǒng)計處理的結果保存到數據庫,以便進行離線分析和數據挖掘。本文將詳細描述實時能耗分項計量的系統(tǒng)架構和內部結構,并對架構中所使用的大數據技術和系統(tǒng)進行介紹和分析,最后,通過實際測試結果對實時能耗分項計量系統(tǒng)的實時數據處理能力進行驗證和分析。
清華大學節(jié)能研究中心研制開發(fā)了能耗分項計量實時分析系統(tǒng)EM-II[2],包括數據采集子系統(tǒng)、數據處理子系統(tǒng)、數據分析展示子系統(tǒng)三大核心部分,另外還有信息維護、數據上報、系統(tǒng)監(jiān)測等幾個子系統(tǒng)。數據采集子系統(tǒng)利用安裝在現場的具有數字通信接口的電計量表和超聲波冷熱量表采集數據,并由數據采集器匯總接收通過網關由路由器連接到互聯網,將數據遠程傳輸回數據中心服務器。數據處理子系統(tǒng)負責校驗解析接收到的原始數據, 并根據能耗模型拆分計算得到分類分項數據。數據分析展示子系統(tǒng)將經過數據處理后的分類分項能耗數據進行分析、匯總和整合,一方面通過靜態(tài)或者動態(tài)的圖表方式將能耗數據展示出來,另一方面能夠提供針對第三方的數據接入服務和數據發(fā)布服務。
Hysine與多個高等院校及科研機構合作研制開發(fā)的EMC-2000建筑設備節(jié)能控制與管理系統(tǒng)[3],適用于新建、改建、擴建項目中建筑機電設備能效跟蹤控制節(jié)能管理。整個能源管理系統(tǒng)由管理中心、主干通信網絡、數據采集器、智能電表等組成,同時為與上一級能耗監(jiān)測和管理系統(tǒng)連接預留系統(tǒng)接口。能源管理中心通過對現場數據采集器上傳的數據進行存儲、統(tǒng)計和分析,為業(yè)主提供有效的能源使用和持續(xù)的能源節(jié)約提供實施依據。
安科瑞開發(fā)的Acrel-5000建筑能耗分析管理系統(tǒng)[4]以計算機、通信設備、測控單元為基本工具,根據現場實際情況采用現場總線、光纖環(huán)網或無線通信中的一種或多種結合的最優(yōu)化的組網方式,為大型公共建筑的實時數據采集及遠程管理與控制提供了基礎平臺,它可以和檢測設備構成任意復雜的監(jiān)控系統(tǒng)。
這些能耗分項計量系統(tǒng)都是參照國家住建部《國家機關辦公建筑和大型公共建筑能耗監(jiān)測系統(tǒng)》[5]和《國家機關辦公建筑和大型公共建筑能耗監(jiān)測系統(tǒng)省、市級數據中心數據庫結構文檔》[6],采用了傳統(tǒng)的數據處理模式,如圖1所示。當數據采集程序接收到數據采集器發(fā)送的數據以后,首先把數據寫入計量表原始數值數據庫(D);然后再由拆分程序按照各個儀表和能耗數據各級分項進行拆分和統(tǒng)計,并把結果寫入分類分項能耗數據庫(B);最后再由分析展示程序基于建筑基本情況數據庫(A)、分類分項能耗數據庫(B)進行數據分析并展示給用戶。
這種基于傳統(tǒng)數據處理模式的分項計量系統(tǒng)只能適用于采集點數量比較少、統(tǒng)計分析比較簡單的環(huán)境。在采集點數量達到上千時,隨著時間的推移,分類分項能耗數據庫的數據會不斷累計快速增加,從而可能導致拆分程序無法及時完成對數據的拆分和統(tǒng)計。
為了解決基于傳統(tǒng)數據處理模式的能耗分項計量系統(tǒng)存在的問題,本文設計并實現了一個基于Spark Streaming和Apache Kafka等大數據技術的實時能耗分項計量系統(tǒng)。在本章首先對Spark Streaming和Apache Kafka大數據技術進行簡單介紹,然后描述如何使用Spark Streaming和Apache Kafka模塊構建基于實時流式數據處理架構的實時能耗分項計量系統(tǒng)。
2.1 Spark Streaming
Apache Spark是一個基于內存的、可以支持各種大數據應用場景的、高性能和高容錯的開源大數據平臺[7]。Spark Streaming是Apache Spark的一個子項目,是一個運行在Spark引擎之上的實時處理工具[8]。
與Hadoop[9]大數據處理平臺不同,Spark建立在統(tǒng)一抽象的RDD(Resilient Distributed Datasets)之上,使得它可以以基本一致的方式應對各種大數據處理場景,包括MapReduce、Streaming、SQL、Machine Learning以及Graph等。
Spark的另一個特點就是其高性能和容錯性。Spark是一種粗粒度數據并行的計算范式,計算的主體是數據集合RDD,而非個別數據。RDD集合內的所有數據都經過同樣的算子序列,數據并行可編程性好,易于獲得高并行性(與數據規(guī)模相關,而非與程序邏輯的并行性相關),也易于高效地映射到底層的并行或分布式硬件上[10]。RDD是一個容錯的、并行的數據結構,在保證容錯的前提下,用內存來承載工作集。內存的存取速度快于磁盤多個數量級,從而可以極大提升性能[11]。Spark的容錯是通過重放日志更新而取得的。因為Spark的函數式語義和冪等特性,重放日志更新RDD不會有副作用。另外,Spark記錄的是粗粒度的RDD更新,所以容錯的開銷可以忽略不計。
Spark的實時性特點是通過Spark Streaming實現的。Spark Streaming將流式計算分解成一系列短小的批處理作業(yè),也就是把輸入數據流按照批次大小(如1 s)分成一段一段的數據形成DStream(Discretized Stream),而每一段數據都轉換成Spark中的RDD,如圖2所示。
圖2 DStream的組成
Spark Streaming提供了兩種操作類型,分別為Transformations和Output操作。對DStream的Transformation操作變?yōu)镾park中對RDD的Transformation操作,從一個已知的DStream經過轉換得到一個新的DStream;而且Spark Streaming還額外增加了一類針對“窗口”(Window)的Transformation操作,可以更靈活地控制DStream的大小(時間間隔大小、數據元素個數等)。整個流式計算根據業(yè)務的需求可以對中間的結果進行疊加,或者使用Output操作將DStream數據輸出到一個外部的存儲系統(tǒng),如數據庫或文件系統(tǒng)等。
Spark具有極高的擴展性與吞吐量。根據Spark官方網站FAQ,最大的已知Spark集群有8 000個節(jié)點[12];并且隨著大數據增多,預計集群規(guī)模也會隨之變大,以便繼續(xù)滿足吞吐量方面的預期。另外,使用Spark的EC2 啟動腳本可以輕松地在Amazon EC2上啟動一個獨立集群。Spark目前在EC2上已能夠線性擴展到100個節(jié)點(每個節(jié)點4核),可以以數秒的延遲每秒處理6 GB的數據量[11]。
一個Spark集群由多個工作節(jié)點(Worker Node)組成,每個工作節(jié)點可以運行一個或多個Executor,如圖3所示。Executor是一個用于應用程序或者工作節(jié)點的進程,負責處理Tasks,并將數據保存到內存或者磁盤中。每個應用程序都有屬于自己的Executor,一個Executor則包含了一定數量的Slots來運行分配給它的任務。在Spark中,每個作業(yè)(Job)都被分隔成多個彼此依賴稱之為Stage的Task。 一個Task就是一個工作單元, 可以發(fā)送給一個Executor執(zhí)行。 Task是用來執(zhí)行應用的實際計算工作。 每個Task占用Executor的一個Slot。
圖3 Spark Streaming集群架構
Spark Streaming流計算可以在數據產生并流入系統(tǒng)時就進行處理并馬上得出結果,非常適合能耗分項計量中能耗數據不斷產生的場景和對實時性的需求。
選擇Spark Streaming的另一個原因是因為Spark可以在支持實時流式處理的同時,進行高效的批處理、交互式SQL查詢和數據挖掘,從而可以使能耗分項計量系統(tǒng)不但可以實時地為用戶捕捉能耗異常情況進行報警,還可以提供離線統(tǒng)計分析和數據挖掘的服務。
2.2 Apache Kafka
Apache Kafka是一個分布式的、高吞吐量的、易于擴展的基于主題發(fā)布/訂閱的消息系統(tǒng),最早是由 LinkedIn 開發(fā),并于2011年開源并貢獻給 Apache 軟件基金會[13]。
Kafka的邏輯架構如圖4所示。Kafka對消息保存時根據話題(Topic)進行歸類,發(fā)送消息者成為生產者(Producer),消息接受者成為消費者(Consumer)。此外Kafka集群由多個服務器組成,每個服務器成為代理(Broker)。無論是Kafka集群,還是Producer和Consumer都依賴于Zookeeper來保證系統(tǒng)可用性。
圖4 Kafka的邏輯架構
一個話題可以認為是一類消息,每個話題將被分成多個分區(qū)(Partition)。設計分區(qū)的最根本原因是Kafka基于文件存儲,通過分區(qū)可以將日志內容分散到多個服務器上,來均衡負載,保證消息保存/消費的效率。如果一個話題對應一個文件,那這個文件所在的機器IO將會成為這個話題的性能瓶頸,而有了分區(qū)后,不同的消息可以并行寫入不同代理的不同分區(qū)里,屬于順序寫磁盤,因此效率非常高,極大地提高了Kafka的吞吐率。所以,消息分區(qū)是Kafka高吞吐率的一個很重要的保證,即使在非常廉價的商用機器上也能做到單機支持每秒104條以上消息的傳輸[14]。此外,越多的分區(qū)意味著可以容納更多的消費者,可以有效提升并發(fā)消費的能力。Kafka的消息分區(qū)結構如圖5所示。
圖5 Kafka消息分區(qū)結構
與傳統(tǒng)的消息系統(tǒng)不同,Kafka系統(tǒng)中存儲的消息沒有明確的消息ID,消息通過日志中的位置稱為偏移量來唯一標記一條消息,這樣就避免了維護密集尋址,用于映射消息ID到實際消息地址的隨機存取索引結構的開銷。這種設計大大提高了Kafka的性能。
Kafka的另外一個創(chuàng)新是即使消息被消費,消息仍然不會被立即刪除。日志文件將會根據代理中的配置,保留一定的時間之后刪除;比如日志文件保留2 d,那么之后文件會被清除,無論其中的消息是否被消費。Kafka通過這種簡單的手段來釋放磁盤空間,從而可以減少消息消費之后對文件內容改動的磁盤IO開支。
Kafka還有一個創(chuàng)新點就是Kafka代理是無狀態(tài)的,由消費者維護已消費的狀態(tài)信息。這種設計的一個好處就是消費者可以退回到老的偏移量再次消費數據。因為代理是無狀態(tài)的,它不需要標記哪些消息被哪些消費者消費過,也不需要代理去保證同一個消費者組里只有一個消費者能消費某一條消息,因此也就不需要鎖機制,這也為Kafka的高吞吐率提供了有力保障。
為了提高可用性,Kafka可以配置分區(qū)需要備份的個數,每個分區(qū)將會被備份到多臺Kafka服務器上,以提高可用性。每個分區(qū)都有一個Kafka服務器為領導者(Leader),負責所有的讀寫操作。如果領導者失效,那么將會有其他跟隨者(Follower)來接管成為新的領導者。跟隨者只是單調地和領導者跟進,同步消息即可。從集群的整體考慮,Kafka會將領導者均衡地分散到每個Kafka服務器上,來確保整體的性能穩(wěn)定。
Kafka可以同時支持離線數據處理和實時數據處理。如圖6所示,Kafka同時支持點到點分發(fā)模型,即多個消費者共同消費隊列中某個消息的單個副本,以及發(fā)布-訂閱模型,即多個消費者接收自己的消息副本。根據這一特性,可以使用Spark實時流處理系統(tǒng)對消息進行實時在線處理,同時使用Hadoop這種批處理系統(tǒng)進行離線處理,還可以同時將數據進行實時備份,只需要保證這三個操作所使用的消費者屬于不同的消費者即可。
圖6 Kafka消息分發(fā)模型
總之,Kafka是一種處理大量數據的新型消息系統(tǒng),其高吞吐量、高可靠、高可用、易擴展的特性完全適應于能耗分項計量系統(tǒng)。此外,通過利用Kafka同時支持多種處理模型的特點,能耗分項計量系統(tǒng)可以在進行能耗數據在線處理的同時,對能耗數據進行備份和離線處理。
2.3 實時能耗分項計量系統(tǒng)架構
本文設計的實時能耗分項計量系統(tǒng)的整體架構如圖7所示,主要包括后端能耗數據采集部分、Kafka消息系統(tǒng)、Spark Streaming集群、Hadoop集群、前端實時展示應用和前端分析展示應用,以及分類分項能耗數據庫和計量表原始數值數據庫。
流式處理系統(tǒng)主要通過網絡Socket通信來實現與外部IO系統(tǒng)的數據交互。由于網絡通信的不可靠特點,發(fā)送端與接收端需要通過一定的協(xié)議來保證數據包的接收確認和失敗重發(fā)機制。不是所有的IO系統(tǒng)都支持重發(fā),這至少需要實現數據流的持久化,同時還要實現高吞吐和低時延。通過前面的介紹,可以確定Kafka具備這些特點,完全能夠作為實時能耗分項計量系統(tǒng)的外部數據源。
除了把Kafka當成輸入數據源之外,實時能耗分項計量系統(tǒng)還將其作為信息輸出數據源向前端實時展示應用推送相關報警和實時流信息。
圖7 實時能耗分項計量系統(tǒng)整體架構
能耗數據采集部分包括能耗采集儀表和數據采集器。一般來講,每個數據采集器可以連接16到32塊采集儀表。數據采集器負責接收所連接采集儀表發(fā)來的能耗數據,并把數據整理為住建部所制定的能耗數據通信協(xié)議格式[15],然后按照設置的時間間隔發(fā)送到設定的數據接收器。每個數據接收器就是Kafka消息系統(tǒng)的消息生產者,負責把從數據采集器發(fā)來的數據寫入Kafka消息系統(tǒng),從而保證了數據的可靠性。
按照住建部要求,計量表采集到的能耗數據一方面必須寫入計量表原始數值數據庫,同時還需要按儀表、按分項進行拆分并把結果寫入分類分項能耗數據庫。在滿足住建部基本要求的同時,實時能耗分項計量系統(tǒng)還對能耗數據進行實時分析以便能夠及時捕捉能耗異常情況,并報警給用戶。
為了能夠保證能耗數據處理的實時性,實時能耗分項計量系統(tǒng)充分利用Kafka消息系統(tǒng)可以同時支持多個消費者組的能力,為能耗數據消息設置兩個消費者組。一個是運行在Spark Streaming流式計算集群上的能耗數據實時數據拆分程序;另一個則是運行在Spark批處理集群上的計量表原始數值寫入程序。
運行在Spark Streaming集群上的能耗數據拆分程序是實時能耗分項計量系統(tǒng)的核心模塊。數據拆分程序以Kafka消息系統(tǒng)作為能耗數據輸入流進行實時在線處理。首先,數據拆分程序對能耗數據進行分類分項拆分,并形成多個數據流供其他業(yè)務處理模塊使用。第二,數據拆分程序把分類分項拆分結果按照不同時間粒度進行統(tǒng)計,并把統(tǒng)計結果寫入分類分項能耗數據庫。時間粒度分為15 min、小時、天和月。
除了能耗數據拆分程序之外,Spark Streaming集群還可以進行多種實時在線數據處理,比如能耗熱點分析和能耗異常分析。這些能耗數據處理程序并不直接從Kafka消息系統(tǒng)中獲取數據,而是使用能耗數據拆分程序生成的數據流進行數據處理,并把分析結果通過Kafka消息系統(tǒng)提供給前端實時展示應用。
實時能耗分項計量系統(tǒng)的另一部分功能是進行離線數據統(tǒng)計、分析以及數據挖掘。前端能耗分析程序基于能耗數據拆分程序寫入到分類分項能耗數據庫的數據,以及計量表原始數值數據進行各類能耗數據統(tǒng)計、分析以及數據挖掘,從而使用能單位可以掌握詳細能耗使用情況,為制定節(jié)能策略提供科學依據。
因為能耗數據記錄的數量遠遠超過了傳統(tǒng)關系型數據庫可以支持的容量,計量表原始數值數據庫和分類分項能耗數據庫均使用了HBase數據庫。HBase是運行在Hadoop上的NoSQL數據庫,它是一個分布式的和可擴展的面向列的數據庫,可以在一組通用硬件上存儲許多具有數十億行和上百萬列的大表[16]。HBase能夠融合Key-Value數據模式帶來實時查詢的能力,以及通過MapReduce或Spark進行離線處理或者批處理的能力??傊?HBase能夠存儲大量的數據,讓用戶在大量的數據中查詢記錄,并從中獲得綜合分析報告。所以,HBase非常適合于存放計量表原始數據和分類分項能耗數據。HBase不但可以滿足能耗分項計量系統(tǒng)每天幾十萬條記錄的大數據量需求,還可以與Hadoop的MapReduce以及Spark SQL和Spark MLlib結合為用戶提供高效能耗數據分析和數據挖掘工作。
2.4 實時能耗分項計量系統(tǒng)內部結構
實時能耗分項計量系統(tǒng)的內部結構如圖8所示。Spark Streaming集群由多個工作者節(jié)點(Worker Node)組成,每個工作者節(jié)點包含一個或多個Spark Executor。同時,在每個工作節(jié)點還安裝了用來存儲能耗數據的數據庫系統(tǒng)HBase和MySQL,以及數據倉庫系統(tǒng)Hive。HBase用來存儲能耗分類分項數據和計量表原始數值數據;MySQL用來存儲與用能單位和分項計量系統(tǒng)各種設備部署情況的結構化數據;Hive用來按主題、多維度、多粒度對分類分項能耗數據進行存儲和管理,為后期進行離線分析和數據挖掘提供良好的基礎。
圖8 實時能耗分項計量系統(tǒng)內部結構
來自各個數據采集器的能耗數據使用同一個話題(Topic)寫入Kafka消息系統(tǒng)。為了保證實時能耗分項計量系統(tǒng)的吞吐量,以便支持更多的計量表,我們對能耗數據消息話題進行了分區(qū)。通過消息分區(qū)可以提高消息生產者和消息消費者的并發(fā)能力。
在實時能耗分項計量系統(tǒng)中,每個消息分區(qū)將有兩個消息消費者小組:一個是Spark Streaming流式集群的能耗數據拆分程序小組;另一個是Spark批處理集群的計量表原始數值數據寫入程序小組。每個小組都由多個消費者組成,每個消息分區(qū)的數據都會被每個小組中的一個消費者接收。
圖8描述了一個包含兩個Worker Node的Spark 集群,每個Worker Node運行了三個Spark Executor。每個Worker Node上有兩個Executor屬于能耗數據拆分程序小組,另外一個屬于計量表原始數值寫入程序小組。
能耗數據消息話題分成了四個分區(qū),能耗數據拆分程序小組中的每個Executor消費一個消息分區(qū)的數據;計量表原始數據值寫入程序小組中的每個Executor負責消費兩個消息分區(qū)的數據。一般來講,分區(qū)的個數最好是消費者小組中消費者的倍數,也就是說,同小組中的每個消費者負責處理的消息分區(qū)個數是等同的。在實際環(huán)境中消息話題分區(qū)的個數需要按照整個系統(tǒng)連接的計量儀表的個數來確定。
與Receiver方式相比,Direct方式雖然使用較為復雜,但是它能提供更好的靈活性和可靠性,所以本文選用Direct方式。Direct方式使用Kafka的基本API,由Spark Streaming負責記錄在每個消息分區(qū)中的消費位移,也就是已經消費過的消息位置,并保存在Spark系統(tǒng)的檢測點(Check Point)記錄中。使用Direct方式,Spark Streaming會周期性地查詢Kafka,來獲得每個消息分區(qū)的最新的位移,從而定義每個數據塊的數據范圍。當處理消息的作業(yè)啟動時,就會使用Kafka的簡單消費API 來獲取Kafka指定范圍的數據。Spark會創(chuàng)建跟Kafka 分區(qū)一樣多的RDD 分區(qū),并且會并行從Kafka中讀取數據。所以在Kafka 分區(qū)和RDD 分區(qū)之間,有一個一對一的映射關系。采用Direct方式的另外一個優(yōu)勢就是可以利用Kafka保證數據的可靠性,并且可以保證數據是消費一次且僅消費一次。
在每個Spark Executor中,運行著能耗數據拆分程序和多個能耗數據實時處理程序。能耗拆分程序的功能在前面已經介紹。每個能耗數據處理程序基于拆分程序生成的實時數據流完成一定的數據處理工作,并把部分數據處理的結果寫入Kafka消息系統(tǒng)供前端實時展示應用使用,同時還會把一些數據處理結果寫入MySQL數據庫供前端分析系統(tǒng)使用。
能耗用量異常分析程序是我們提供的一個能耗實時數據處理程序,它基于數據拆分程序提供的能耗使用數據流,根據用能單位設置的各類閾值以及正常能耗使用量發(fā)現用能異常情況,并通過Kafka消息系統(tǒng)及時報警給前端實時展示應用。比如,單位給某辦公樓層設置的空調用電的閾值為每小時20度,能耗異常分析程序在對能耗使用數據流進行處理時就會檢測該樓層的空調用電量,當用電量超過每小時20度時,就會產生報警消息通過Kafka提交給前端實時展示應用。同樣的,假定某小區(qū)正常煤氣流量為10 m3/min左右,如果能耗用量異常分析程序發(fā)現該小區(qū)煤氣流量遠遠超過了10 m3/min,那么就有可能是發(fā)生了煤氣管道漏氣。這時,能耗用量異常分析程序就會產生報警消息。
能耗用量熱點分析程序是我們提供的另一個能耗實時數據處理程序,它會實時統(tǒng)計每個計量點的每刻的能耗使用量并通過Kafka消息系統(tǒng)發(fā)布。前端實時展示應用可以獲取感興趣的計量點的流量統(tǒng)計來繪制能耗用量熱點圖,從而可以一目了然地及時了解所關心計量點的能耗使用狀況。
2.5 實時能耗分項計量系統(tǒng)的優(yōu)勢
本文提出的實時能耗分項計量系統(tǒng)充分利用了最先進的大數據技術,特別是流計算技術,并針對能耗分項計量的特點對整體系統(tǒng)架構和內部結構進行了認真的研究與設計。與傳統(tǒng)的能耗分項計量系統(tǒng)比,本文提出的實時能耗分項計量系統(tǒng)具有如下優(yōu)勢:
首先,實時能耗分項計量系統(tǒng)可以同時支持實時在線數據處理和離線數據統(tǒng)計分析,而傳統(tǒng)的能耗分項計量系統(tǒng)只支持對能耗使用情況的離線統(tǒng)計和分析。實時能耗分項計量系統(tǒng)的異常情況實時報警功能和能耗使用熱點實時分析功能,不僅可以使用能單位在發(fā)生能耗異常情況時可以及時采取相應措施,防止異常情況蔓延,還可以讓用能單位隨時掌握整體能耗情況的實時現狀。
其次,實時能耗分項計量系統(tǒng)具有很強的數據處理能力。整體系統(tǒng)架構使用了當前最先進的快速流式處理系統(tǒng)Spark Streaming和具有高可靠、高吞吐量的Kafka消息系統(tǒng)作為實時數據流處理的核心架構。整個數據處理過程是基于內存,而不像傳統(tǒng)能耗分項計量系統(tǒng)需要把數據首先寫入文件系統(tǒng),然后再讀入到內存進行處理,所以,實時能耗分項計量系統(tǒng)的處理效率會比傳統(tǒng)能耗分項計量系統(tǒng)提高百倍以上。這意味著,在同樣的硬件配置情況下,實時能耗分析計量系統(tǒng)可以支持的能耗采集點數可以提高上百倍。
第三,實時能耗分項計量系統(tǒng)具有很強的可擴展性。實時能耗分項計量系統(tǒng)架構中的Kafka消息系統(tǒng)、Spark系統(tǒng)、HBase系統(tǒng)和Hadoop系統(tǒng)都是分布式集群結構,并具有很強的擴展能力。所以,在使用實時能耗分項計量系統(tǒng)的每個階段,用戶只需要部署能夠滿足當時能耗監(jiān)控需求的設備即可,而不需要考慮后期可能的需求。這一方面可以節(jié)省用戶的投資成本,還減少了用戶初期部署的設計負擔。
第四,實時能耗分項計量系統(tǒng)提供快速數據挖掘能力。除了強大的實時數據處理能力以外,借助于Spark 平臺,實時能耗分項計量系統(tǒng)還可以利用Spark MLlib進行深度數據挖掘,發(fā)現復雜的能耗數據之間的關聯關系,從而為制定有效的節(jié)能措施提供科學依據?;赟park MLlib的數據挖掘效率會遠遠高于基于MapReduce模式的Mahout數據挖掘系統(tǒng)的效率。
第五,實時能耗分項計量系統(tǒng)可以很容易增加新的業(yè)務處理功能。在當前系統(tǒng)中,提供了能耗異常分析和能耗用量熱點分析兩個實時處理功能,但是今后可以根據用戶需求很方便地添加新的業(yè)務處理能力。新添加的業(yè)務處理功能將會與原有的處理并行進行,并不會影響現有的實時業(yè)務處理能力。
為了檢驗實時能耗分項計量系統(tǒng)進行分項計量和實時數據處理的能力,實際部署了一套實時能耗分項計量系統(tǒng),在對各種參數進行優(yōu)化之后,進行了一系列的測試。
3.1 測試環(huán)境
測試環(huán)境是運行在云平臺上的7臺虛擬機組成。每臺虛擬機的配置為8核CPU,25 GB內存,1 TB HDD磁盤。圖9描述了實時能耗分項計量系統(tǒng)各個模塊的部署情況。
圖9 系統(tǒng)測試環(huán)境
系統(tǒng)部署的指導思想是要保證整個系統(tǒng)的可靠性和可擴充性,并且保證節(jié)點之間的負載均衡性。具體部署情況如下:在兩臺服務器上部署了Hadoop的Name Node和HBase的Master Server;三臺服務器上部署了Zookeeper、Spark、Hadoop的Data Node和HBase的Master Server;最后兩臺服務器上部署了Kafka、Spark、Hadoop的Data Node和HBase的Master Server。
3.2 測試結果與分析
實時能耗分項計量系統(tǒng)的性能指標主要考慮的是系統(tǒng)的吞吐量和處理數據的延遲時間。系統(tǒng)的吞吐量一般用兩個指標來衡量:一是單位時間內系統(tǒng)能夠處理的能耗數據的條數;二是系統(tǒng)處理一條能耗數據所需要的時間。單位時間內處理的數據條數越多說明系統(tǒng)的吞吐量越高,系統(tǒng)處理數據的能力越強。處理能耗數據的延遲時間的指標也有兩個:一個是從接收到一條能耗數據到開始處理該條數據之間的時間間隔稱為調度延遲時間(Scheduling Delay);另一個是從接收到一條能耗數據到處理完該條數據之間的時間間隔稱為總延遲時間(Total Delay)。處理能耗數據的延遲時間越小,說明系統(tǒng)處理數據越及時,系統(tǒng)實時性越強。
在測試中,通過給實時能耗分項計量系統(tǒng)的Kafka消息系統(tǒng)加載實際數據來測試系統(tǒng)的吞吐量和處理數據的延遲時間,測試結果如圖10所示。圖中展示的測試運行了6 min 27 s,每秒加載一組能耗數據,每組數據大約包含140條能耗記錄,總共處理了387組數據,64 968條能耗記錄。
圖10(a)展示的是給系統(tǒng)加載能耗數據的速率(Input Rate)??梢钥闯鼋o系統(tǒng)加載能耗數據的平均速率為每秒167.88條記錄,瞬間最高值達到了每秒300條以上,絕大多數數據都是按每秒140到200條數據的速率發(fā)送的。
圖10(b)展示的是數據的調度延遲時間。盡管顯示的平均調度延遲時間為10 ms,但從圖中可以看出這主要是由于在測試剛開始啟動時,第一批數據有一個2 s延遲而導致的。從右圖可以看出,其余批次數據的調度延遲平均值在0.2 ms以內。
圖10(c)展示的是處理一批能耗數據所需要的時間。圖中顯示處理每批數據的平均時間為133 ms。如果考慮到除去系統(tǒng)剛啟動運行的第一批數據,那么平均處理每一批數據的時間會在100 ms以內。從圖的形狀來看,除了第一批數據以外,系統(tǒng)整個處理過程非常平穩(wěn)。
圖10(d)展示的是系統(tǒng)處理能耗數據的總延遲的平均時間為143 ms。類似于調度延遲時間,總延遲平均時間也因為第一批數據的延遲而拉高。如果剔除第一批數據,其余批次數據的總延遲時間均在100 ms以內。
圖11以表格的形式展示了測試最后26批次數據的結果,包括每批數據的條數和提交時間,也就是包含多少條能耗數據、調度延遲時間、處理時間和總延遲時間。通過圖11的數據,可以更進一步佐證上面對測試數據的分析結果。從圖11可以看出每批數據平均包含146.88條能耗數據;平臺調度延遲時間為0.34 ms;每批數據的平均處理時間為117.65 ms;平均總延遲時間為118 ms。所以,實時能耗分項計量的吞吐量為每秒處理1 248條記錄(146.88/117.65×1 000)。
圖11的數據是在系統(tǒng)度過了初始階段達到穩(wěn)定以后的數據,結合圖10的整體情況,可以知道圖11的數據更能代表實時能耗分項計量系統(tǒng)的特性。
圖10 系統(tǒng)實時性測試結果
圖11 系統(tǒng)實時性詳細測試結果
3.3 測試結論
從上面的實驗結果可以看出,在實時能耗分項計量系統(tǒng)啟動以后,只需要處理完第一批數據以后,就能達到穩(wěn)定的運行狀態(tài),大約3 s。平均實時能耗分項計量系統(tǒng)的吞吐量為每秒處理1 248條記錄,平臺調度延遲時間為0.34 ms;每批數據的平均處理時間為117.65 ms;平均總延遲時間為118 ms。所以,實時能耗分項計量系統(tǒng)具有很高的吞吐量,實時性很強,并且系統(tǒng)數據處理速率很平穩(wěn)。
按照國家住建部分項計量規(guī)則要求,每塊分項計量儀表需要每15 min提交一次數據;而在15 min時間內,實時能耗分項計量系統(tǒng)可以處理超過100萬條(15×60×1 200)數據。也就是說,在現有的系統(tǒng)配置環(huán)境下,實時能耗分項計量系統(tǒng)可以支持100萬塊儀表。因為傳統(tǒng)的能耗分項計量系統(tǒng)需要先把數據寫入磁盤文件,然后再讀入進行數據處理,并且沒有采用大數據并發(fā)處理技術,所以每套系統(tǒng)能支持的分項計量儀表一般都在1 000塊左右,只適合于單個企事業(yè)單位的分項計量工作。實時能耗分項計量系統(tǒng)將處理能耗數據的能力提升了上千倍,完全可以滿足同一個城市的所有公共事業(yè)單位提供分項計量服務。
本文提出了一種基于Spark Streaming和Apache Kafka模塊構建的用于能耗分項計量的實時流式處理系統(tǒng),簡稱實時能耗分項計量系統(tǒng)。它能夠滿足能耗分項計量數據產生快、實時性強、數據量大的數據處理需求。與傳統(tǒng)數據處理架構不同,實時能耗分項計量系統(tǒng)不僅提供離線數據的統(tǒng)計與分析,并且根據業(yè)務需求對數據進行實時在線處理,在數據流動的過程中實時地捕捉異常信息并進行處理,最終把結果保存或者分發(fā)給需要的組件。本文詳細描述了實時能耗分項計量系統(tǒng)的整體架構和內部結構,闡述了其主要特點,并通過實際測試證明了其強大的數據處理能力和實時性。
從功能方面來講,與傳統(tǒng)的能耗分項計量系統(tǒng)相比,實時能耗分項計量系統(tǒng)的最大優(yōu)點就是在支持離線能耗統(tǒng)計的同時,還可以支持實時在線數據處理和深度數據挖掘。比如,可以對能耗數據流進行實時分析,發(fā)現能耗用量異常情況,及時報警給用戶,以便用能單位可以及時采取相應措施,防止異常情況蔓延。再比如,實時能耗分項計量系統(tǒng)還可以實時統(tǒng)計各計量點能耗情況并實時展示給用戶,使用能單位及時掌握整體能耗的實時現狀。
從性能方面來講,本文提出的實時能耗分項計量系統(tǒng)進行能耗數據處理的能力遠遠超過傳統(tǒng)的能耗分項計量系統(tǒng),能夠支持能耗數據采集點的個數高出上千倍。并且,實時能耗分項計量系統(tǒng)具有很強的擴展能力,可以通過增加服務器和存儲設備來提高其總體處理能力,從而可以支持更多的能耗數據采集點。
總之,本文提出的實時能耗分項計量系統(tǒng)不論從性能方面、功能方面,還是從系統(tǒng)的可擴展方面都遠優(yōu)于傳統(tǒng)的能耗分項計量系統(tǒng)。本系統(tǒng)的第一版開發(fā)已經完成,已經在2016年開始在四川省進行實地部署。此外,本文提出的實時流式數據處理系統(tǒng)還可以應用于其他流式數據處理場合,比如股市走向分析、氣象數據測控、網站用戶行為分析和公路卡口過車數據分析等。
References)
[1] 清華大學建筑節(jié)能研究中心. 中國建筑節(jié)能年度發(fā)展研究報告2010[M]. 北京:中國建筑工業(yè)出版社, 2010:105-130.(Building energy conservation research center of tsinghua university. Annual Report of China Building Energy Conservation 2010[M]. Beijing:China Architecture and Building Press, 2010:105-130.)
[2] 魏慶芃. 大型公共建筑能耗分項計量實時監(jiān)測分析系統(tǒng)EMS-II的發(fā)展[J]. 建筑, 2009(3):34-37.(WEI Q P. Development of the detailed classification energy consumption measurement system for large public building EMS-II [J]. Construction and Architecture, 2009(3):34-37.)
[3] EMC-2000能源管理系統(tǒng)[EB/OL]. [2016- 09- 10]. http://www.hysine.cn/web/list/369/1.html.(EMC-2000 energy management system [EB/OL]. [2016- 09- 10]. http://www.hysine.cn/web/list/369/1.html.)
[4] 黃斌, 杜運東, 曹雪華. 基于Acrel-5000的大型公共建筑能耗監(jiān)測系統(tǒng)設計與應用[J]. 智能建筑電氣技術, 2009, 3(5):47-50.(HUANG B, DU Y D, CAO X H. Design and application of large public building energy consumption monitoring system Acrel-5000[J]. Electrical Technology of Intelligent Building, 2009, 3(5):47-50.)
[5] 國家機關辦公建筑和大型公共建筑能耗監(jiān)測系統(tǒng)——分項計量數據采集技術導則[S]. 北京:中華人民共和國住房和城鄉(xiāng)建設部, 2008:1-25.(Government offices and large public buildings energy consumption monitoring system — the technical guidance for detailed classification energy data collection[S]. Beijing: Ministry of Housing and Urban-Rural Development of the People’s Republic of China, 2008:1-25.
[6] 國家機關辦公建筑和大型公共建筑能耗監(jiān)測系統(tǒng)——省、市級數據中心數據庫結構文檔[S]. 北京:中華人民共和國住房和城鄉(xiāng)建設部, 2009:1-12.(Government offices and large public buildings energy consumption monitoring system — provincial and municipal data center database structure document[S]. Beijing: Ministry of Housing and Urban-Rural Development of the People’s Republic of China, 2009:1-12.)
[7] Spark programming guide[EB/OL]. [2016- 07- 27]. http://spark.apache. org /docs/latest/programming-guide.html.
[8] Spark streaming programming guide [EB/OL]. [2016- 07- 27]. https://sp ark.apache.org/docs/latest/programming-guide.html.
[9] 陸嘉恒. Hadoop實戰(zhàn)[M]. 北京:機械工業(yè)出版社, 2012:1-121.(LU J H. Hadoop in Action[M]. Beijing: China Machine Press, 2012:1-121.)
[10] KARAU H, KONWINSKI A, WENDELL P, et al. Spark快速大數據分析[M]. 王道遠, 譯. 北京: 人民郵電出版社, 2015: 161-185.(KARAU H, KONWINSKI A, WENDELL P, et al. Learning Spark: Lighting-Fast Data Analysis [M]. WANG D Y, translated. Beijing:Posts and Telecom Press, 2015: 161-185.)
[11] 夏俊鸞, 邵賽賽. Spark Streaming:大規(guī)模流式數據處理的新貴[J]. 程序員, 2014(2):44-48.(XIA J L, SHAO S S. Spark streaming: large-scale streaming data processing upstart[J]. Programmer, 2014(2):44-48.)
[12] Apache spark FAQ [EB/OL]. [2016- 08- 04]. https://spark.apache.org/faq.html.
[13] Apache Kafka: a high-throughput distributed messaging system [EB/OL]. [2016- 01- 09]. http://kafka.apache.org/documentation.html.
[14] KREPS J, NARKHED N, RAO J. Kafka: a distributed messaging system for log processing[C]// NetDB2011: Proceedings of the 6th International Workshop on Networking Meets Databases. New York: ACM, 2011: Article No. 12.
[15] 國家機關辦公建筑和大型公共建筑能耗監(jiān)測系統(tǒng)——數據上傳XML格式文檔[S]. 北京:中華人民共和國住房和城鄉(xiāng)建設部, 2009:55-59.(Government offices and large public buildings energy consumption monitoring system — XML format for data uploading [S]. Beijing: Ministry of Housing and Urban-Rural Development of the People’s Republic of China, 2009:55-59.)
[16] GEORGE L. HBase權威指南[M]. 代志遠, 劉佳, 蔣杰, 譯. 北京:人民郵電出版社, 2013:5-25.(GEORGE L. HBase: the Definitive Guide[M]. DAI Z Y, LIU J, JIANG J, translated. Beijing:Posts and Telecom Press, 2013:5-25.)
WU Zhixue, born in 1960, Ph. D., professor. His research interests include cloud computing, streaming data processing, data mining.
Real-time detailed classification energy consumption measurement system based on Spark Streaming
WU Zhixue1,2*
(1. Chengdu Wuzhou Handge Technology Limited, Chengdu Sichuan 611731, China;2. School of Information Security Engineering, Chengdu University of Information Technology, Chengdu Sichuan 610225, China)
Detailed classification energy consumption measurement can discover energy consuming issues more accurately, timely and effectively, which can form and implement the most effective energy-saving measures. Detailed classification energy measurement system needs to calculate energy consumption amounts at multiple time scales according to detailed classification coding. Not only does it need to complete the tasks timely, but also need to deal with data aggregating, data de-duplication and data joining operations. Due to the fast speed of the data being generated, the requirement of the data being processed in real-time, and the big size of the data volume, it is difficult to store the data to a database system first, and then to process the data afterwards. Therefore, the traditional data processing infrastructure cannot fulfil the requirements of detailed classification energy consumption measurement system. A new real-time detailed classification energy consumption measurement system based on Spark Streaming technologies was designed and implemented, the system infrastructure and the internal structure of the system were introduced in detail, and its real-time data processing capabilities were proved through experiments. Different from the traditional ways, the proposed system processes energy consumption data in real-time to capture any unusual behaviour timely; at the same time, it separates the data and calculates the consumption usages according to the detailed classification coding, and stores the results to a database system for offline analysis and data mining, which can effectively solve the previously mentioned problems encountered in the data processing process.
stream computing; detailed classification energy consumption measurement; Spark Streaming; Apache Kafka; big data
2016- 10- 10;
2016- 12- 21。
武志學(1960—),男,山西河津人,教授,博士,主要研究方向:云計算、流式數據處理、數據挖掘。
1001- 9081(2017)04- 0928- 08
10.11772/j.issn.1001- 9081.2017.04.0928
TP391
A