吳海濤 廖 兵 王 丹 李張?zhí)?周文進(jìn)
(1.四川明星電力股份有限公司,四川 遂寧 629000 2.北京國網(wǎng)信通埃森哲信息技術(shù)有限公司,北京 100031)
目前數(shù)據(jù)中心的數(shù)據(jù)抽取方式大致為全量數(shù)據(jù)接入方式或基于全量對比的數(shù)據(jù)抽取技術(shù)。該方式處理效率低下,經(jīng)常會造成數(shù)據(jù)積壓嚴(yán)重,通常無法達(dá)到數(shù)據(jù)的實(shí)時(shí)性抽取任務(wù)要求。針對該問題,本系統(tǒng)對現(xiàn)今的增量抽取技術(shù)進(jìn)行了研究,提出一種Change Data Capture (CDC)結(jié)合GPkafka的實(shí)時(shí)數(shù)據(jù)接入抽取技術(shù),極大地提升了目前明星電力公司中臺數(shù)據(jù)抽取實(shí)時(shí)性,達(dá)到了零數(shù)據(jù)積壓的數(shù)據(jù)入庫要求。
CDC又稱變更數(shù)據(jù)捕獲(Change Data Capture)。CDC有兩個(gè)模式:同步和異步。同步CDC主要是采用觸發(fā)器記錄新增數(shù)據(jù),基本能夠做到實(shí)時(shí)增量抽取。而異步CDC則是通過分析已經(jīng)commit的日志記錄來得到增量數(shù)據(jù)信息,有一定的時(shí)間延遲,并且提供了到Oracle Streams的接口。同步相對比較簡單,通過觸發(fā)器捕獲增量數(shù)據(jù)。而異步CDC根據(jù)實(shí)現(xiàn)的內(nèi)部機(jī)制區(qū)別,又可以分為異步HotLog模式,異步分布式HotLog模式和異步AutoLog模式[1]。
2.1 Kafka。Kafka是一個(gè)分布式消息隊(duì)列。Kafka對消息保存時(shí)根據(jù)Topic進(jìn)行歸類,發(fā)送消息者稱為Producer,消息接受者稱為Consumer。Producer即生產(chǎn)者,向Kafka集群發(fā)送消息,在發(fā)送消息之前,會對消息進(jìn)行分類。Topic即主題,通過對消息指定主題可以將消息分類,消費(fèi)者可以只關(guān)注自己需要的Topic中的消息。Consumer即消費(fèi)者,消費(fèi)者通過與kafka集群建立長連接的方式,不斷地從集群中拉取消息,然后可以對這些消息進(jìn)行處理。
2.2 GPSS。Greenplum Stream Server(GPSS)是 一 個(gè)ETL(提取、轉(zhuǎn)換、加載)工具。GPSS服務(wù)器的一個(gè)實(shí)例從一個(gè)或多個(gè)客戶機(jī)接收流數(shù)據(jù),使用Greenplum數(shù)據(jù)庫可讀的外部表將數(shù)據(jù)轉(zhuǎn)換并插入到目標(biāo)Greenplum表中。數(shù)據(jù)源和數(shù)據(jù)格式是特定于客戶機(jī)的。數(shù)據(jù)源和數(shù)據(jù)格式由客戶端指定[2]。
整體技術(shù)流程圖如圖1所示
圖1 基于異步聯(lián)機(jī)日志結(jié)合流處理的實(shí)時(shí)數(shù)據(jù)抽取實(shí)現(xiàn)
系統(tǒng)采用可視化數(shù)據(jù)抽取流程建模技術(shù)實(shí)現(xiàn)了以上的數(shù)據(jù)抽取過程??梢暬瘮?shù)據(jù)抽取技術(shù)通過組件連接的方式形成數(shù)據(jù)處理管道。
整體數(shù)據(jù)實(shí)時(shí)抽取流程由兩個(gè)管道組成,分別是數(shù)據(jù)生產(chǎn)者和數(shù)據(jù)消費(fèi)者。
3.1 數(shù)據(jù)生產(chǎn)者管道。通過解析異步聯(lián)機(jī)日志,CDC感知數(shù)據(jù)源庫Oracle中表的數(shù)據(jù)更新、增加和刪除操作,并將操作流轉(zhuǎn)換成數(shù)據(jù)流發(fā)送到Kafka中。通過CDC讀取到的日志進(jìn)行SQL解析與轉(zhuǎn)換,將數(shù)據(jù)解析為對應(yīng)數(shù)據(jù)的JSON對象,并標(biāo)記該數(shù)據(jù)是增加、修改還是刪除數(shù)據(jù),然后將數(shù)據(jù)存入到Kafka中。需要注意的是,存儲到Kafka中的數(shù)據(jù),不同來源表的數(shù)據(jù)需要存儲到不同的Topic中,相當(dāng)于每個(gè)Topic中僅存儲固定的一張?jiān)幢淼脑隽繑?shù)據(jù)信息,一般情況下,使用Oracle的schema名和table名聯(lián)合起來作為Topic的名稱[3]。
3.2 數(shù)據(jù)消費(fèi)者管道。GPKafka通過Job方式將導(dǎo)數(shù)的配置提交到GPSS,GPSS讀取Kafka對應(yīng)的Topic數(shù)據(jù),并寫入對應(yīng)的Grennplum數(shù)據(jù)庫,然后將Job的調(diào)度的信息存入到本地文件中。GPKafka消費(fèi)者根據(jù)配置的目標(biāo)倉庫的schema和table信息,自動生成對應(yīng)的Job配置文件,然后提交Job至GPSS中進(jìn)行執(zhí)行。GPSS接收到Job信息后,開始根據(jù)Job配置讀取Kafka中對應(yīng)Topic中的數(shù)據(jù),然后寫入到GreenPlum數(shù)據(jù)庫中。當(dāng)同時(shí)運(yùn)行的Job數(shù)量過多時(shí),GPKafka消費(fèi)者會根據(jù)實(shí)際情況,暫停和調(diào)度不同的Job運(yùn)行情況,以滿足當(dāng)前數(shù)據(jù)抽取業(yè)務(wù)的需要。最后將流程運(yùn)行階段性情況和調(diào)度情況寫入到本地文件系統(tǒng)中[4]。
數(shù)據(jù)來源:明星電力公司中臺數(shù)據(jù),大小為從100M-5G單表。
測試環(huán)境:1臺Oracle數(shù)據(jù)庫服務(wù)器,1臺大數(shù)據(jù)中臺服務(wù)器,1個(gè)GreenPlum實(shí)例(6臺服務(wù)器,1個(gè)Master,24個(gè)Segment),Kafka集群(3臺服務(wù)器節(jié)點(diǎn)),服務(wù)器物理內(nèi)存64G,內(nèi)網(wǎng)帶寬1Gbps。測試所用源庫表以完全入庫完畢。
測試方法:對比方法:
第一,CDC讀取數(shù)據(jù)庫數(shù)據(jù)變化日志,然后通過傳統(tǒng)JDBC直接寫入Grennplum數(shù)據(jù)庫,后面稱為(CDC+JDBC);第二,我們系統(tǒng)采用的方法:通過CDC讀取數(shù)據(jù)庫數(shù)據(jù)變化日志,然后通過GPKafka直接寫入Grennplum數(shù)據(jù)庫,后面稱為(CDC+GPKAFKA);第三,通過JDBC實(shí)現(xiàn)全量抽取,并通過Greenplum存儲組件將數(shù)據(jù)寫入Grennplum數(shù)據(jù)庫,后面稱為(JDBC全量抽?。5]。
實(shí)現(xiàn)對比結(jié)果如表3、表4所示。
表3 各個(gè)方法測試對比結(jié)果(累計(jì)讀取源Oracle數(shù)據(jù)庫效率)
表4 各個(gè)方法測試對比結(jié)果(累計(jì)寫入目標(biāo)Greenplum數(shù)據(jù)庫效率)
通過觀察計(jì)算可以發(fā)現(xiàn)JDBC全量抽取和CDC+JDBC這兩種方式再讀取和寫入效率上都是一致的,因?yàn)樗麄兌荚谕粋€(gè)任務(wù)中做處理。而CDC+GPKAFKA這種方式,由于讀取出數(shù)據(jù)后,暫存在Kafka中,有個(gè)消費(fèi)的過程,它的讀取和寫入的效率是有區(qū)別的,寫入效率遠(yuǎn)高于讀取的效率。從事實(shí)數(shù)據(jù)抽取的效率上面來看,CDC+GPKAFKA這種方式是最優(yōu)選。CDC+GPKAFKA由于同時(shí)采用了CDC異步日志分析技術(shù)和kafak流式并行寫入技術(shù),不經(jīng)過JDBC接口因此在小表和大表,小數(shù)據(jù)改變量到大數(shù)據(jù)改變量情況下都保持了非常好的數(shù)據(jù)抽取寫入性能[6]。
我們對多個(gè)業(yè)務(wù)數(shù)據(jù)源系統(tǒng)進(jìn)行數(shù)據(jù)實(shí)時(shí)抽取,包含的數(shù)據(jù)源表總共約5000余張,歷史存量數(shù)據(jù)約150GB,月增量約10GB。同時(shí),以上所述業(yè)務(wù)系統(tǒng)內(nèi)的原始數(shù)據(jù)源表,包含部分未做分區(qū)、分表等設(shè)計(jì)的超大表,無唯一鍵和主鍵等現(xiàn)象,其中最大的表記錄數(shù)量達(dá)到2億條,約5GB,為數(shù)據(jù)中心對于該類表的數(shù)據(jù)實(shí)時(shí)性抽取增加了較大的困難和障礙。原始的數(shù)據(jù)抽取方式為全量數(shù)據(jù)接入方式和基于全量對比的數(shù)據(jù)抽取技術(shù)。該方式處理效率低下,通常造成數(shù)據(jù)積壓嚴(yán)重,無法達(dá)到數(shù)據(jù)的實(shí)時(shí)性抽取任務(wù)要求[7]。
針對該問題,提出的一種CDC結(jié)合GPKafka的實(shí)時(shí)數(shù)據(jù)接入抽取技術(shù),提升了中臺數(shù)據(jù)抽取實(shí)時(shí)性。通過與CDC+JDBC方式、JDBC全量抽取方式對比,CDC+GPKAFKA有明顯的優(yōu)勢,在對1億條數(shù)據(jù)量的讀寫測試中,CDC+GPKafka的方式讀寫數(shù)據(jù)效率分別261000條/分鐘和509000條/分鐘。達(dá)到了零數(shù)據(jù)積壓的數(shù)據(jù)入庫要求。