胡志寶 陸會(huì)明
(華北電力大學(xué)控制與計(jì)算機(jī)工程學(xué)院,北京102206)
工業(yè)生產(chǎn)過(guò)程中,我們希望所有生產(chǎn)環(huán)節(jié)都能夠安全高效進(jìn)行,但是實(shí)際生產(chǎn)中,某些物理量可能會(huì)出現(xiàn)較大波動(dòng),使得生產(chǎn)過(guò)程不夠穩(wěn)定甚至發(fā)生危險(xiǎn),所以在工業(yè)生產(chǎn)過(guò)程中,我們會(huì)對(duì)某些環(huán)節(jié)進(jìn)行監(jiān)測(cè)記錄,以便在出現(xiàn)特殊情況時(shí)及時(shí)采取措施來(lái)保證生產(chǎn)過(guò)程的安全。當(dāng)前傳感器、多媒體、數(shù)據(jù)庫(kù)和無(wú)線網(wǎng)絡(luò)技術(shù)在工業(yè)生產(chǎn)中得到了廣泛應(yīng)用,由此開(kāi)發(fā)了工業(yè)生產(chǎn)監(jiān)控系統(tǒng)、工業(yè)生產(chǎn)流程處理系統(tǒng)等,這些系統(tǒng)在運(yùn)行中會(huì)積累海量的數(shù)據(jù)。通過(guò)對(duì)這些數(shù)據(jù)進(jìn)行挖掘,我們可以知道生產(chǎn)過(guò)程是否穩(wěn)定,哪些量容易超過(guò)限制,哪些環(huán)節(jié)需要重點(diǎn)關(guān)注,找出生產(chǎn)過(guò)程的薄弱點(diǎn)對(duì)方案進(jìn)行改進(jìn)。由此可見(jiàn),數(shù)據(jù)的挖掘分析可以提高工業(yè)生產(chǎn)決策的準(zhǔn)確度,進(jìn)一步改進(jìn)工業(yè)生產(chǎn)效率[1]。
Spark[2]是專(zhuān)為大數(shù)據(jù)處理而設(shè)計(jì)的快速通用的計(jì)算引擎,是一個(gè)基于內(nèi)存計(jì)算的可擴(kuò)展的開(kāi)源集群計(jì)算系統(tǒng),解決了MapReduce[3]在大量的網(wǎng)絡(luò)傳輸和磁盤(pán)I/O 時(shí)效率低的問(wèn)題。Spark 具有運(yùn)行速度快、易用性、通用性、兼容性強(qiáng)的四大特點(diǎn)[4]。Spark SQL[5]是Spark 用來(lái)處理結(jié)構(gòu)化數(shù)據(jù)的一個(gè)模塊,它提供了一個(gè)編程抽象叫做DataFrame,并且具有作為分布式SQL查詢(xún)引擎的作用,其使用類(lèi)SQL 的語(yǔ)法作為高層的數(shù)據(jù)操縱API,極大地降低了數(shù)據(jù)分析工作的難度。
Spark SQL 是Spark 框架的重要組成部分,主要用于結(jié)構(gòu)化數(shù)據(jù)處理和對(duì)Spark 數(shù)據(jù)執(zhí)行類(lèi)SQL 的查詢(xún),其數(shù)據(jù)處理功能可以從以下三個(gè)過(guò)程來(lái)體現(xiàn):
2.1.1 抽 ?。‥xtract):Spark SQL 可 以 從 多 種 文 件 系 統(tǒng)(HDFS[6]、S3. 本地文件系統(tǒng)等)、關(guān)系型數(shù)據(jù)庫(kù)(MySQL、Oracle等)或NoSQL 數(shù)據(jù)庫(kù)(Cassandra、HBase[7]、Druid 等)中獲取數(shù)據(jù),Spark SQL 支持的文件類(lèi)型可以是CSV、JSON、XML 等。
2.1.2 轉(zhuǎn)換(Transform):在數(shù)據(jù)清洗方面,比如空值處理、拆分?jǐn)?shù)據(jù)、規(guī)范化數(shù)據(jù)格式、數(shù)據(jù)替換等,Spark SQL 能夠準(zhǔn)確高效地完成這類(lèi)轉(zhuǎn)換操作。
2.1.3 加載(Load):在將數(shù)據(jù)處理完成之后,Spark SQL 還可以將數(shù)據(jù)存儲(chǔ)到各種數(shù)據(jù)源中。
除了上述功能,SparkSQL 還可以和Spark 的其他模塊搭配使用,完成各種各樣更為復(fù)雜的工作,比如和SparkStreaming[8]搭配處理實(shí)時(shí)的數(shù)據(jù)流,和MLlib[9]搭配完成一些機(jī)器學(xué)習(xí)的應(yīng)用[10]。Spark SQL 無(wú)縫地將SQL 查詢(xún)與Spark 程序混合,允許開(kāi)發(fā)人員將結(jié)構(gòu)化數(shù)據(jù)作為Spark 中的分布式數(shù)據(jù)集進(jìn)行查詢(xún),在Python,Scala 和Java 中集成了API,使得開(kāi)發(fā)人員可以輕松地運(yùn)行SQL 查詢(xún)以及復(fù)雜的分析算法。
Spark SQL 對(duì)SQL 語(yǔ)句的處理過(guò)程和關(guān)系型數(shù)據(jù)庫(kù)是類(lèi)似的,會(huì)先對(duì)SQL 語(yǔ)句進(jìn)行解析形成Tree,然后使用Rule 對(duì)Tree進(jìn)行綁定、優(yōu)化等處理,通過(guò)模式匹配對(duì)不同類(lèi)型的節(jié)點(diǎn)采用不同的操作。Spark SQL 模塊劃分為Core、Catalyst、Hive[11]和Hive-ThriftServer 四大模塊,每個(gè)模塊的作用如下:
2.2.1 Core:負(fù)責(zé)處理數(shù)據(jù)的輸入/輸出,從不同的數(shù)據(jù)源獲取數(shù)據(jù),然后將查詢(xún)結(jié)果輸出成DataFrame。
2.2.2 Catalyst:負(fù)責(zé)處理查詢(xún)語(yǔ)句的整個(gè)過(guò)程,包括解析、綁定、優(yōu)化、物理計(jì)劃等,是Spark SQL 最為核心的部分,其性能優(yōu)劣將決定整體的性能。
2.2.3 Hive:負(fù)責(zé)對(duì)Hive 數(shù)據(jù)的處理。
2.2.4 Hive-ThriftServer:提供CLI 和JDBC/ODBC 接口等。
圖1 Spark SQL 執(zhí)行過(guò)程
圖2 讀取的數(shù)據(jù)
這四個(gè)模塊共同工作,完成數(shù)據(jù)的獲取和SQL 語(yǔ)句的執(zhí)行任務(wù),圖1 所示即為Spark SQL 執(zhí)行SQL 語(yǔ)句的過(guò)程。
對(duì)于得到的工業(yè)生產(chǎn)中的數(shù)據(jù),通過(guò)對(duì)數(shù)據(jù)進(jìn)行處理挖掘,得到我們需要的統(tǒng)計(jì)量,常見(jiàn)的統(tǒng)計(jì)量有均值、均方差、極值等。超限統(tǒng)計(jì)也是非常重要的一項(xiàng)統(tǒng)計(jì)內(nèi)容,包括統(tǒng)計(jì)各個(gè)超限時(shí)間段及超限的持續(xù)時(shí)間。
本文采用的是Java+ Spark SQL 的方式來(lái)實(shí)現(xiàn)數(shù)據(jù)的讀取與處理,數(shù)據(jù)讀取需要先創(chuàng)建SparkSession 服務(wù),然后使用SparkSession 服務(wù)讀取數(shù)據(jù)。數(shù)據(jù)的第一列是日期列,讀取時(shí)將日期轉(zhuǎn)化為時(shí)間戳的形式,單位是毫秒。數(shù)據(jù)讀取的代碼如下:
圖2 為數(shù)據(jù)讀取結(jié)果。
timestamp 是時(shí)間列,對(duì)應(yīng)著獲得每個(gè)數(shù)據(jù)的采樣時(shí)間點(diǎn),其余列是在采樣時(shí)間點(diǎn)各個(gè)過(guò)程量的值。
3.2.1 常規(guī)統(tǒng)計(jì)量
常規(guī)統(tǒng)計(jì)量包括均值、均方差、最大值、最小值。通過(guò)均值和均方差我們可以知道過(guò)程量是否穩(wěn)定在我們?cè)O(shè)置的輸入附近,生產(chǎn)過(guò)程是否穩(wěn)定。這些統(tǒng)計(jì)量可以直接調(diào)用API 函數(shù)即可實(shí)現(xiàn),過(guò)程如圖3。
圖3 常規(guī)統(tǒng)計(jì)流程
獲取常規(guī)統(tǒng)計(jì)量時(shí)與時(shí)間列無(wú)關(guān)且對(duì)時(shí)間列進(jìn)行常規(guī)統(tǒng)計(jì)并沒(méi)有實(shí)際意義,所以先去掉時(shí)間列,然后對(duì)剩余列進(jìn)行常規(guī)統(tǒng)計(jì),實(shí)現(xiàn)的代碼如下,
data = data.drop("timestamp");
data = data.describe()。
3.2.2 超限統(tǒng)計(jì)
生產(chǎn)過(guò)程中一些過(guò)程量會(huì)有對(duì)應(yīng)的上限值,過(guò)程量超過(guò)上限值時(shí)系統(tǒng)就會(huì)報(bào)警,就需要進(jìn)行調(diào)節(jié)使過(guò)程量回歸正常值,一旦超限時(shí)間持續(xù)太長(zhǎng)就會(huì)出現(xiàn)危險(xiǎn)。超限統(tǒng)計(jì)就是對(duì)這種情況進(jìn)行統(tǒng)計(jì),統(tǒng)計(jì)超過(guò)限定值的持續(xù)時(shí)間以及最大超限時(shí)長(zhǎng)。實(shí)際生產(chǎn)過(guò)程中會(huì)設(shè)置死區(qū)值,即過(guò)程量的值從超限狀態(tài)回歸正常值時(shí),如果僅是下降穿過(guò)設(shè)定的上限值并不算回歸正常,還需要繼續(xù)下降一定的量才算回歸正常,繼續(xù)下降的這部分為死區(qū)。
超限統(tǒng)計(jì)牽扯到了時(shí)間段的統(tǒng)計(jì),難點(diǎn)在于如何確定每一段超限時(shí)間段的起始時(shí)間點(diǎn)和終止時(shí)間點(diǎn),只要得到起始時(shí)間點(diǎn)和終止時(shí)間點(diǎn)就可以計(jì)算出持續(xù)時(shí)長(zhǎng)??梢韵炔豢紤]死區(qū)的存在,僅考慮上限值,找出所有超過(guò)上限值的數(shù)據(jù),然后再判斷這些數(shù)據(jù)相鄰行之間對(duì)應(yīng)的時(shí)間點(diǎn)在時(shí)間上是否以采樣周期等距連續(xù)的,從而找出每一段連續(xù)的超限數(shù)據(jù),記錄下來(lái)起始時(shí)間點(diǎn)。然后將上限值減去死區(qū)值得到的值作為新的上限值,以相同方法找出每一段超限數(shù)據(jù),記錄下來(lái)結(jié)束時(shí)間點(diǎn)。將兩次記錄下來(lái)的時(shí)間點(diǎn)進(jìn)行比較,可以得到考慮死區(qū)時(shí)每一段超限時(shí)間的起始時(shí)間點(diǎn)和結(jié)束時(shí)間點(diǎn)。解決步驟如下:
(1)將統(tǒng)計(jì)的數(shù)據(jù)與上限值進(jìn)行比較,取出所有超過(guò)上限值的數(shù)據(jù)。
(2)判斷這些數(shù)據(jù)對(duì)應(yīng)的時(shí)間點(diǎn)是否以采樣周期等距連續(xù),找出連續(xù)的各個(gè)時(shí)間段。
(3)記錄找出的各個(gè)時(shí)間段的起始時(shí)間點(diǎn),用列表list1 保存下來(lái)。
(4)上限值減去死區(qū)值作為新的上限值,重復(fù)(1)、(2)得到新的連續(xù)時(shí)間段,然后判斷得到的各個(gè)時(shí)間段內(nèi)的數(shù)據(jù)是否存在超限數(shù)據(jù),保留存在超限數(shù)據(jù)的時(shí)間段,記錄保留的時(shí)間段的結(jié)束時(shí)間點(diǎn),用列表list2 保存下來(lái)。
(5)將兩個(gè)列表中的元素進(jìn)行比較,確定在考慮死區(qū)時(shí)的每一段超限時(shí)間段的起始時(shí)間點(diǎn)列表和終止時(shí)間點(diǎn)列表,從而得到超限時(shí)間段。
得到超限時(shí)間段之后,即可計(jì)算出每段超限時(shí)間段的持續(xù)時(shí)長(zhǎng),通過(guò)比較得到最大時(shí)長(zhǎng),通過(guò)求和求出總的超限時(shí)長(zhǎng)。超限統(tǒng)計(jì)流程如圖4 所示。
圖5 常規(guī)統(tǒng)計(jì)結(jié)果
圖4 超限統(tǒng)計(jì)流程
超限統(tǒng)計(jì)的代碼主要包括列表list1 和列表list2 的獲取,列表的獲取分別編寫(xiě)了一個(gè)函數(shù)來(lái)實(shí)現(xiàn)。
(1)獲取list1 的函數(shù)及其參數(shù)如下:
public List<List<Long>>limit(Dataset data, Float set, String tag, Long T, SparkSession spark);
函數(shù)的參數(shù)依次分別是被統(tǒng)計(jì)的數(shù)據(jù)、設(shè)定值、被統(tǒng)計(jì)的列的名稱(chēng)、采樣周期、SparkSession 服務(wù)。返回值為步驟(2)中各個(gè)時(shí)間段的開(kāi)始時(shí)間點(diǎn)組成的列表和結(jié)束時(shí)間點(diǎn)組成的列表,list1 為開(kāi)始時(shí)間點(diǎn)組成的列表。函數(shù)的主要代碼語(yǔ)句與說(shuō)明如下,
data = spark.sql (“select timestamp from tempTable where ”+tag+“>”+set+“”); //取出超過(guò)設(shè)定值的數(shù)據(jù)
data = spark.sql(“select *, timestamp - lag(timestamp,1)over(order by timestamp) as diff from tempTable”); //將取出來(lái)的數(shù)據(jù)對(duì)應(yīng)的采樣時(shí)間點(diǎn)相鄰行做差
data = spark.sql(“select timestamp,(case when diff = ”+T+“then 1 else 0 end) as status from tempTable”); //若上一步求得的差值等于采樣周期,則標(biāo)記該行狀態(tài)為否則為0,生成狀態(tài)列data = spark.sql(“select timestamp,status,lead(status)over(order by timestamp) as lead from tempTable”); //將狀態(tài)列向前移動(dòng)一行得到新的狀態(tài)列
根據(jù)兩列狀態(tài)列找出各個(gè)連續(xù)時(shí)間段的開(kāi)始時(shí)間點(diǎn)和結(jié)束時(shí)間點(diǎn)
Dataset starttime = data.select(“timestamp”).filter(“status = 0 and lead = 1”);//開(kāi)始時(shí)間表
Dataset endtime = data.select(“timestamp”).filter(“status = 1 and lead = 0”);//結(jié)束時(shí)間表
獲得的開(kāi)始時(shí)間表和結(jié)束時(shí)間表是dataframe,還需要將其轉(zhuǎn)換為列表,開(kāi)始時(shí)間表轉(zhuǎn)化的列表即為list1。
(2)獲取list2 的函數(shù)及其參數(shù)說(shuō)明如下:
public List<Long>death(Dataset data, Float set, Float value,String tag, Long T, SparkSession spark)
函數(shù)的參數(shù)依次分別是被統(tǒng)計(jì)的數(shù)據(jù)、設(shè)定值、設(shè)定值減去死區(qū)值得到的數(shù)值,被統(tǒng)計(jì)的列的名稱(chēng)、采樣周期、SparkSession服務(wù)。返回值分別為步驟(3)中保留的各個(gè)時(shí)間段的結(jié)束時(shí)間點(diǎn)組成的列表,即為list2 的值。函數(shù)主要代碼語(yǔ)句與說(shuō)明如下:
List<Long>s = limit(data,value,tag,T,spark).get(0); //調(diào)用limit函數(shù)求出各個(gè)連續(xù)時(shí)間段的開(kāi)始時(shí)間點(diǎn)
List<Long>e = limit (data,value,tag,T,spark).get (1); //調(diào)用limit 函數(shù)求出各個(gè)連續(xù)時(shí)間段的開(kāi)始時(shí)間點(diǎn)
確定各個(gè)時(shí)間段是否存在超限數(shù)據(jù),保留存在超限數(shù)據(jù)的時(shí)間段的結(jié)束時(shí)間點(diǎn)的集合作為返回值,即為list2 的值。
要想求出考慮死區(qū)情況的超限時(shí)間段,還需要將已經(jīng)求出的list1 和list2 進(jìn)行比較。list2 中的元素就是最終要求的超限時(shí)間段的結(jié)束時(shí)間點(diǎn),然后將list1 中的元素與list2 中的元素依次比較來(lái)確定超限時(shí)間段的開(kāi)始時(shí)間點(diǎn)。在得到各個(gè)超限時(shí)間段的開(kāi)始和結(jié)束時(shí)間點(diǎn)之后,將列表轉(zhuǎn)換為DataFrame,開(kāi)始列與結(jié)束列做差得到持續(xù)時(shí)長(zhǎng)。
常規(guī)統(tǒng)計(jì)的結(jié)果如圖5 所示,超限統(tǒng)計(jì)選擇的是DOMAIN1:UNITMW 列,設(shè)定值為520,死區(qū)值為1,統(tǒng)計(jì)結(jié)果如圖6 所示。
圖6 超限時(shí)間段及其持續(xù)時(shí)長(zhǎng)
本文利用大數(shù)據(jù)處理技術(shù)Spark SQL 對(duì)數(shù)據(jù)進(jìn)行統(tǒng)計(jì),不僅統(tǒng)計(jì)了常用的一些統(tǒng)計(jì)量如均值、最值、均方差等,還進(jìn)行了超限統(tǒng)計(jì),有助于我們了解統(tǒng)計(jì)的數(shù)據(jù)所在的時(shí)間段內(nèi)工業(yè)生產(chǎn)的狀態(tài)是否穩(wěn)定,有沒(méi)有按照預(yù)期進(jìn)行工作。對(duì)于得到的工業(yè)生產(chǎn)的數(shù)據(jù),我們還可以利用這些數(shù)據(jù)進(jìn)行建模,然后對(duì)模型進(jìn)行優(yōu)化、提高生產(chǎn)效率。同時(shí)Spark SQL 技術(shù)在數(shù)據(jù)處理方面有著很大的優(yōu)勢(shì),計(jì)算速度快、通用性高,還可以用于數(shù)據(jù)管理查詢(xún)系統(tǒng)。