摘要:隨著互聯(lián)網技術在工業(yè)生產中廣泛應用,工業(yè)互聯(lián)網的發(fā)展突飛猛進。在工業(yè)生產中,企業(yè)為了對工業(yè)大數(shù)據(jù)進行更好的采集、分析和預處理,利用大數(shù)據(jù)技術搭建大數(shù)據(jù)集群來完成各個生產環(huán)節(jié)?;贖adoop的高可用分布式框架已經成為很多企業(yè)在集群搭建中的首選。文章在基于高可用Hadoop組件基礎上,搭建了Hive、HBase、Spark、Flink、Kafka等大數(shù)據(jù)生態(tài)系統(tǒng)中一些重要組件,用于對數(shù)據(jù)的存儲、采集、抽取、清洗、預處理和分析等操作,幫助企業(yè)在生產過程中完善生產環(huán)節(jié),提高生產效率。
關鍵詞:工業(yè)大數(shù)據(jù);Hadoop集群搭建;數(shù)據(jù)處理
中圖分類號:TP311" 文獻標志碼:A
基金項目:2023年保定市科技計劃項目;項目名稱:基于高可用集群和隨機森林算法的工業(yè)大數(shù)據(jù)分析平臺;項目編號:2311ZG018。
作者簡介:張艷敏(1985— ),女,講師,碩士;研究方向:大數(shù)據(jù)技術,軟件技術。
0" 引言
在工業(yè)生產過程中,各個生產環(huán)節(jié)產生的數(shù)據(jù)越來越多,這些數(shù)據(jù)大多是非結構化數(shù)據(jù),傳統(tǒng)的關系型數(shù)據(jù)庫已無法滿足對這些數(shù)據(jù)的存儲與處理,因此,文章利用大數(shù)據(jù)技術原理搭建大數(shù)據(jù)高可用集群來實現(xiàn)工業(yè)大數(shù)據(jù)的采集與存儲等操作,集群中包含大數(shù)據(jù)生態(tài)系統(tǒng)中一些常用的組件。在已搭建的集群中通過Spark技術實現(xiàn)對離線數(shù)據(jù)的抽取、清洗和預處理,利用Flink技術對實時數(shù)據(jù)進行分析與存儲。整個生產過程在大數(shù)據(jù)集群環(huán)境中運轉流暢,最終達到了為企業(yè)節(jié)約成本、創(chuàng)造更多有益價值的目的。
1" 系統(tǒng)整體設計
根據(jù)企業(yè)實際生產場景,本文搭建了基于Hadoop HA高可用的集群[1],集群中包含Hive、HBase、Spark、Flink、Redis和MySQL等組件,實現(xiàn)對工業(yè)生產中設備信息(machine.csv)、設備狀態(tài)信息(showFactChangeRecordList.csv)、環(huán)境檢測信息(showFactEnvironmentData.csv)和產品加工信息(showFactProduceRecord.csv)的采集和處理。其中HDFS、Hive和HBase等組件用來存儲數(shù)據(jù),Spark用來對離線數(shù)據(jù)進行抽取、清洗和預處理,F(xiàn)link主要對實時生產數(shù)據(jù)進行計算和分析后存儲到Redis或MySQL數(shù)據(jù)庫中[2]。集群整體結構如圖1所示。
2" 離線數(shù)據(jù)處理
離線數(shù)據(jù)處理是利用Spark技術對已經存儲在數(shù)據(jù)庫中的數(shù)據(jù)進行預處理,一般用Scala語言編寫,通常包括數(shù)據(jù)抽取、清洗和指標計算等操作[3]。
2.1" 數(shù)據(jù)抽取
數(shù)據(jù)抽取包含全量抽取和增量抽?。?]。全量抽取是將源數(shù)據(jù)庫中的所有數(shù)據(jù)抽取到目標數(shù)據(jù)庫中,增量抽取是將自上次抽取后發(fā)生改變的數(shù)據(jù)從源數(shù)據(jù)庫抽取到目標數(shù)據(jù)庫中。
2.1.1" 全量抽取
在生產過程中,研究人員通常會將數(shù)據(jù)從MySQL中抽取到Hive中,方便數(shù)據(jù)更高效的處理。MySQL中包括數(shù)據(jù)庫shtd_industry,抽取shtd_industry庫中ChangeRecord表的全量數(shù)據(jù)進入Hive的ods庫,構成表changerecord,字段排序、類型不變;同時添加靜態(tài)分區(qū),分區(qū)字段為etldate,類型為String,值為當前日期(如20230702)。ChangeRecord的表結構如表1所示。
抽取操作執(zhí)行完畢后,系統(tǒng)可以通過hive cli執(zhí)行show partitions ods.changerecord命令查看分區(qū)結果,如圖2所示。
2.1.2" 增量抽取
在企業(yè)生產過程中,有些數(shù)據(jù)只保留最新數(shù)據(jù),例如環(huán)境監(jiān)測表EnvironmentData(表結構如表2所示)中只保留每臺設備的最新監(jiān)測數(shù)據(jù),在數(shù)據(jù)抽取時抽取MySQL中shtd_industry庫的EnvironmentData表的增量數(shù)據(jù);將其輸入Hive的ods庫中構成表environmentdata,將ods.environmentdata表中inputtime作為增量字段,僅將新增的數(shù)據(jù)抽入,字段排序、類型不變;同時添加靜態(tài)分區(qū),分區(qū)字段為etldate,類型為String,值為當前日期(如20230702)。
2.2" 數(shù)據(jù)清洗
在生產過程中,系統(tǒng)會產生大量的“臟”數(shù)據(jù),數(shù)據(jù)清洗就是去除這些“臟”數(shù)據(jù),通過篩選、過濾等操作使數(shù)據(jù)變得更加干凈和準確[4]。數(shù)據(jù)清洗是數(shù)據(jù)處理過程中非常重要的一步,可以提高數(shù)據(jù)的質量和可信度,為后續(xù)的數(shù)據(jù)處理工作提供更有效安全的數(shù)據(jù),例如對數(shù)據(jù)進行去重整合等操作。
在數(shù)據(jù)抽取中,系統(tǒng)將MySQL中數(shù)據(jù)抽取到Hive的ods庫中,在數(shù)據(jù)清洗中將ods庫中的changerecord全量數(shù)據(jù)抽取到dwd庫表fact_change_record中,在抽取之前須要對數(shù)據(jù)根據(jù)changeid和changemachineid進行聯(lián)合去重處理,并且添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time這4列。其中dwd_insert_user、dwd_modify_user均填寫“user1”;dwd_insert_time、dwd_modify_time均填寫當前操作時間。執(zhí)行完畢后,系統(tǒng)使用hive cli按照change_machine_id、change_id降序排序,查詢前1條數(shù)據(jù),如圖3所示。
2.3" 指標計算
系統(tǒng)將清洗后的數(shù)據(jù)存入dwd庫,進入指標計算。指標計算是企業(yè)根據(jù)業(yè)務需求對數(shù)據(jù)進行的針對性查詢[5],通常是將多個數(shù)據(jù)源的數(shù)據(jù)集成到一個統(tǒng)一的數(shù)據(jù)存儲庫中。例如:系統(tǒng)使用Spark根據(jù)dwd層的fact_change_record表和dim_machine表統(tǒng)計,計算每個車間設備的月平均運行時長與所有設備的月平均運行時長對比結果(即設備狀態(tài)為“運行”,結果值為:高/低/相同),計算結果存入MySQL數(shù)據(jù)庫shtd_industry的machine_running_compare表中。dim_machine表、machine_running_compare表結構分別如表3和4所示。
3" 實時數(shù)據(jù)采集與處理
在工業(yè)生產過程中,系統(tǒng)除了對離線數(shù)據(jù)進行處理,還須要對實時數(shù)據(jù)進行處理。實時數(shù)據(jù)通過Flume采集后存儲在Kafka消息隊列中,再通過Flink讀取Kafka中的流數(shù)據(jù),對數(shù)據(jù)進行實時處理與分析,將結果存儲到數(shù)據(jù)庫中。
3.1" 實時數(shù)據(jù)采集
在主節(jié)點中,系統(tǒng)使用Flume采集/data_log目錄下實時日志文件中的數(shù)據(jù),將數(shù)據(jù)存入Kafka的Topic中(Topic名稱為ChangeRecord,分區(qū)數(shù)為4)[6],F(xiàn)lume采集ChangeRecord主題的配置如圖4所示。
3.2" 實時數(shù)據(jù)處理
當實時數(shù)據(jù)采集完畢后,系統(tǒng)使用Flink消費Kafka中ChangeRecord主題的數(shù)據(jù)[7],例如每隔1 min輸出最近3 min的預警次數(shù)最多的設備,將結果存入Redis,key值為“warning_last3min_everymin_out”,value值為“窗口結束時間,設備id”。本文使用redis cli以HGETALL key方式獲取warning_last3min_everymin_out值,如圖5所示。
4" 結語
文章介紹了工業(yè)大數(shù)據(jù)高可用集群搭建的整體架構,在此基礎上實現(xiàn)了離線數(shù)據(jù)處理以及實時數(shù)據(jù)的采集和處理。系統(tǒng)在離線數(shù)據(jù)處理中采用數(shù)據(jù)抽取、清洗和指標計算;在實時數(shù)據(jù)中使用Flume采集數(shù)據(jù)到Kafka中,再通過Flink技術進行計算后將結果存入Redis。整個流程來自真實的企業(yè)生產過程。本文將大數(shù)據(jù)技術應用到企業(yè)生產中,為企業(yè)生產效率提高、轉換提供了有效價值。
參考文獻
[1]劉曉莉,李滿,熊超,等.基于Hadoop搭建高可用數(shù)據(jù)倉庫的研究和實現(xiàn)[J].現(xiàn)代信息科技,2023(1):99-101.
[2]黎心怡,夏梓彤,莊嘉濠,等.基于大數(shù)據(jù)技術的實時軌道交通分析預測可視化系統(tǒng)的設計與實現(xiàn)[J].電腦知識與技術,2023(29):71-74.
[3]鄭倩倩.基于Kettle的工業(yè)數(shù)據(jù)集成與應用[D].重慶:西南大學,2023.
[4]謝文閣,佟玉軍,賈丹,等.數(shù)據(jù)清洗中重復記錄清洗算法的研究[J].軟件工程師,2015(9):61-62.
[5]何文韜.基于Spark的工業(yè)大數(shù)據(jù)能效分析平臺的設計與實現(xiàn)[D].大連:大連理工大學,2018.
[6]林子雨.數(shù)據(jù)采集與預處理[M].北京:人民郵電出版社,2022.
[7]林子雨,陶繼平.Flink編程基礎[M].北京:清華大學出版社,2022.
(編輯" 王雪芬)
Design and implementation of industrial big data high availability cluster construction based
on big data technology
ZHANG" Yanmin, MA" Xiaotao, YANG" Bingqian, WU" Weihong, ZHAO" Bin
(Hebei Software Institute, Baoding 071000, China)
Abstract: With the wide application of the Internet in industrial production, the development of industrial Internet is advancing rapidly. In industrial production, in order to assist enterprises in better collecting, analyzing, and preprocessing the industrial big data, it is necessary to build a big data cluster to complete various production processes using big data technology. Hadoop based highly available distributed frameworks have become the preferred choice for many enterprises in cluster construction. In the article, based on highly available Hadoop components,some important components in the big data ecosystem such as Hive, HBase,Spark, Flink,Kafka, etc." are built to store, collect, extract, clean, preprocess, and analyze data, helping enterprises improve production processes and increase production efficiency.
Key words: industrial big data; Hadoop cluster construction; data processing