王 梓,梁正和,吳瑩瑩
(河海大學(xué) 計(jì)算機(jī)與信息學(xué)院,江蘇 南京 211110)
大數(shù)據(jù)時代的到來,信息已成為各行各業(yè)發(fā)展的決策依據(jù)。世界計(jì)算機(jī)數(shù)據(jù)信息總量海量增長,已經(jīng)有越來越多的企業(yè)、教育、科研機(jī)構(gòu)和生廠商們意識到這些數(shù)據(jù)中所包含的重要價值,ETL的發(fā)展已成為必然趨勢[1]。企業(yè)在使用各種應(yīng)用系統(tǒng)期間往往積累了大量的信息、數(shù)據(jù)資源,計(jì)算機(jī)技術(shù)的快速發(fā)展使用戶可以對這些信息進(jìn)行采集、分析、處理,它們成為了企業(yè)發(fā)展的決策性依據(jù),構(gòu)成了企業(yè)發(fā)展的寶貴財(cái)富。隨著時間的推移,技術(shù)的進(jìn)步,原有舊系統(tǒng)不斷升級從而被新系統(tǒng)完全取代,在此使用期間往往會積累大量珍貴的歷史數(shù)據(jù),這些數(shù)據(jù)也是新系統(tǒng)成功啟用的關(guān)鍵。同時,這些從舊系統(tǒng)到新系統(tǒng)使用過程中的歷史數(shù)據(jù)也是企業(yè)進(jìn)行決策發(fā)展,制定戰(zhàn)略方向的重要依據(jù)。
如今,隨著數(shù)據(jù)的大量累積,越來越多的企業(yè)、廠商都在構(gòu)建數(shù)據(jù)倉庫(data warehouse)來滿足企業(yè)在發(fā)展過程中的戰(zhàn)略需要,他們需要將這些可能來自不同的軟硬件平臺、操作系統(tǒng)、數(shù)據(jù)模型乃至地理上分布、管理上自治和模式上異構(gòu)的數(shù)據(jù)源進(jìn)行集成。一個或多個不同數(shù)據(jù)源的相關(guān)數(shù)據(jù)可以進(jìn)行綜合集中放入數(shù)據(jù)倉庫[2],在數(shù)據(jù)倉庫中可以針對不同的主題和匯總數(shù)據(jù)進(jìn)行統(tǒng)計(jì)和分析,從而為決策人員提供數(shù)據(jù)支持。在構(gòu)建數(shù)據(jù)倉庫的過程中,首先需要將各種分布的、異構(gòu)的數(shù)據(jù)源中的數(shù)據(jù)抽取出來,在此過程中進(jìn)行清洗、轉(zhuǎn)換、集成最后加載到數(shù)據(jù)倉庫中,這個過程叫做抽取轉(zhuǎn)換裝載即ETL(extraction transformation loading)[3]數(shù)據(jù)遷移。ETL在構(gòu)建過程中需要面對傳輸效率、準(zhǔn)確性、數(shù)據(jù)異構(gòu)性、多目標(biāo)等問題。傳統(tǒng)的ETL在解決不同操作系統(tǒng)之間使用不同的編程語言問題方面,設(shè)置一個專有的轉(zhuǎn)換引擎置于數(shù)據(jù)源和目標(biāo)數(shù)據(jù)倉庫[4]之間,用于運(yùn)行所有的轉(zhuǎn)換程序。但在數(shù)據(jù)轉(zhuǎn)換過程中,專有引擎執(zhí)行所有轉(zhuǎn)換工作成為“瓶頸”。隨著企業(yè)的發(fā)展,數(shù)據(jù)需要從結(jié)構(gòu)化數(shù)據(jù)源(關(guān)系數(shù)據(jù)庫),非結(jié)構(gòu)化數(shù)據(jù)源(PDF文件、郵件等),半結(jié)構(gòu)化數(shù)據(jù)源(XML和其他標(biāo)記語言),遺留系統(tǒng)(主機(jī))、應(yīng)用程序包(SAP)等異構(gòu)數(shù)據(jù)源中提取,同時數(shù)據(jù)量也呈現(xiàn)出遞增式的增長,對數(shù)據(jù)的存儲[5],數(shù)據(jù)的異構(gòu)性、并發(fā)性進(jìn)行研究已成為當(dāng)前的主要研究方向。
在ETL構(gòu)建過程中,盡量降低ETL過程的設(shè)計(jì)與維護(hù)代價,提升ETL過程的執(zhí)行效率,是企業(yè)在實(shí)際項(xiàng)目中重要考慮的問題,因此,設(shè)計(jì)一種優(yōu)秀的ETL工具對數(shù)據(jù)倉庫[6]非常有益。利用ETL工具可以對異構(gòu)數(shù)據(jù)源中的業(yè)務(wù)數(shù)據(jù)進(jìn)行抽取和轉(zhuǎn)換,并將其裝載到數(shù)據(jù)倉庫[7],其主要作用是對各類業(yè)務(wù)數(shù)據(jù)的清理、轉(zhuǎn)換和裝載,為基于數(shù)據(jù)倉庫的決策分析應(yīng)用提供高質(zhì)量的數(shù)據(jù)。截止目前,生產(chǎn)的數(shù)據(jù)量大大提高,傳統(tǒng)數(shù)據(jù)處理和數(shù)據(jù)倉庫技術(shù)已不能滿足海量數(shù)據(jù)[8]處理的現(xiàn)實(shí)需求,因此基于Kafka和Disruptor,提出一種對傳統(tǒng)ETL進(jìn)行改進(jìn)的模型[9],并就某教育企業(yè)對全國高校系統(tǒng)數(shù)據(jù)遷移進(jìn)行了實(shí)驗(yàn)研究。
ETL是對數(shù)據(jù)進(jìn)行抽取、清洗、轉(zhuǎn)換和裝載[10]的過程,數(shù)據(jù)從異構(gòu)數(shù)據(jù)源中抽取,遷移到指定的目標(biāo)庫。其間,數(shù)據(jù)的抽取、清洗、轉(zhuǎn)換和裝載形成串行或并行的過程。T過程是ETL的核心,也是數(shù)據(jù)的轉(zhuǎn)換,而抽取和裝載一般可以作為轉(zhuǎn)換的輸入和輸出,或者作為一個單獨(dú)的部件,其復(fù)雜程度沒有轉(zhuǎn)換部件高。ETL是構(gòu)建數(shù)據(jù)倉庫的重要組成部分,用戶從數(shù)據(jù)源抽取出所需的數(shù)據(jù),經(jīng)過數(shù)據(jù)清洗,最終按照預(yù)先定義好的數(shù)據(jù)倉庫模型,將數(shù)據(jù)加載到數(shù)據(jù)倉庫中。傳統(tǒng)的ETL上手快,易操作,可是當(dāng)數(shù)據(jù)海量增長時,傳統(tǒng)的ETL在性能和數(shù)據(jù)處理的準(zhǔn)確性、多樣性、并發(fā)性等方面卻大打折扣?,F(xiàn)在數(shù)據(jù)的ETL過程經(jīng)常會選擇Kafka作為消息中間件應(yīng)用在離線和實(shí)時的場景中。針對上述問題,文中基于Kafka和Disruptor并發(fā)框架對傳統(tǒng)ETL進(jìn)行了改進(jìn)。
Disruptor是一個高性能的異步處理框架,一般被設(shè)計(jì)在生產(chǎn)者—消費(fèi)者(producer-consumer problem,PCP)問題上,可以獲得盡量高的吞吐量(TPS)和盡量低的延遲。針對“并發(fā)、緩沖區(qū)、生產(chǎn)者—消費(fèi)者模型、事務(wù)處理”這些元素的程序來說,Disruptor是一種大幅提升性能(TPS)的方案。它本質(zhì)上是個ringbuffer,buffer(就是數(shù)組)做過優(yōu)化防止JVM偽共享,lock free是通過CAS自旋[11],多線程[12]并發(fā)獲取buffer中的序號,這里需要CAS,把事件放入槽中,工作線程調(diào)度交給jdk線程池,只要buffer中有事件,就不停提交給線程池,不需要鎖進(jìn)程,解決了多線程讀寫,實(shí)現(xiàn)讀寫同步,解決了數(shù)據(jù)延遲的問題。
(1)disruptor沒有鎖,所以效率高,速度快。
(2)所有訪問者都記錄自己序號的實(shí)現(xiàn)方式[13],允許多個生產(chǎn)者與多個消費(fèi)者共享相同的數(shù)據(jù)結(jié)構(gòu)。
近年來,Kafka作為一個新興的分布式消息系統(tǒng),受到了眾多企業(yè)、科研機(jī)構(gòu)的青睞。Kafka在分布式集群應(yīng)用中作為多種類型的數(shù)據(jù)管道和消息系統(tǒng)[14]而應(yīng)用廣泛。流數(shù)據(jù)是大多數(shù)集群統(tǒng)計(jì)和實(shí)時數(shù)據(jù)采集過程中所產(chǎn)生的數(shù)據(jù),可能包括頁面訪問量、物聯(lián)網(wǎng)傳感器采集數(shù)據(jù)等方式。Kafka用作LinkedIn的活動流(activity stream)和運(yùn)營數(shù)據(jù)處理[15]管道(pipeline)的基礎(chǔ)。在消息保存中Kafka根據(jù)每個topic進(jìn)行歸類,消息發(fā)送者稱為producer,接收消息者稱為consumer,此外Kafka集群[16]由多個Kafka實(shí)例組成,每個實(shí)例(server)稱為broker。
(1)持久化能力的高效性。對TB級以上的數(shù)據(jù)也可以在常數(shù)時間復(fù)雜度讀取和寫入硬盤。
(2)支持broker間的消息分區(qū),并保證分區(qū)中消息讀取的有序性。
(3)分布式系統(tǒng),易于向外擴(kuò)展。所有的producer、broker和consumer都會有多個,均為分布式的。無需停機(jī)即可擴(kuò)展機(jī)器。
(4)消息被處理的狀態(tài)是在consumer端維護(hù),而不是由server端維護(hù)。當(dāng)失敗時能自動平衡。
(5)異步:Kafka分布式[17]消息系統(tǒng)采用異步通信機(jī)制,消息進(jìn)入系統(tǒng)緩存后系統(tǒng)無需立刻應(yīng)答或處理,可以根據(jù)用戶需求和配置情況選擇。
(6)持久性、可靠性:消息被持久化到本地磁盤,并且支持?jǐn)?shù)據(jù)備份,防止數(shù)據(jù)丟失。
基于Kafka和Disruptor數(shù)據(jù)處理技術(shù)對傳統(tǒng)ETL的改進(jìn)如圖1所示。
圖1 改進(jìn)的ETL模型
實(shí)驗(yàn)中,Kafka結(jié)合Disruptor技術(shù),數(shù)據(jù)通道在啟動后,會啟動一個restful數(shù)據(jù)被動接收接口,前端埋點(diǎn)或其他的數(shù)據(jù)收集服務(wù)會將收集到的數(shù)據(jù)以restful+json的方式傳到接口,接口在接收到數(shù)據(jù)后,會根據(jù)配置的類型,將數(shù)據(jù)發(fā)送到指定的接收源,接口源可以是配置的一個read-type,是Kafka的job,實(shí)現(xiàn)數(shù)據(jù)之間的快速傳輸。從多種數(shù)據(jù)源中抽取數(shù)據(jù),將數(shù)據(jù)發(fā)送到Disruptor的RingBuffer環(huán)形區(qū)域,Disruptor數(shù)據(jù)消費(fèi)者發(fā)送數(shù)據(jù)到Kafka服務(wù)器,由于Disruptor高性能[18]、低延遲的特點(diǎn),從而提高了目標(biāo)源數(shù)據(jù)到Kafka中數(shù)據(jù)的速度,因此在傳輸速度方面對傳統(tǒng)的ETL有很大的改進(jìn),極大節(jié)約了時間[19]。再利用Kafka高吞吐和異步的特性,Disruptor可持續(xù)發(fā)送數(shù)據(jù)到Kafka中,Kafka消費(fèi)者處理數(shù)據(jù),節(jié)省了數(shù)據(jù)等待傳輸時間,同時該模型可接受多種不同數(shù)據(jù)源,實(shí)現(xiàn)數(shù)據(jù)多樣性的傳輸。
為此,在研發(fā)創(chuàng)新方面,華岳每年投入大量資金,不斷提升產(chǎn)品質(zhì)量和智能化程度,同時也收獲了寶貴的自主知識產(chǎn)權(quán)。截至目前,公司共獲得48項(xiàng)專利,其中發(fā)明專利7項(xiàng),實(shí)用新型38項(xiàng),有力支撐了產(chǎn)品更新升級和向高端化發(fā)展。
啟動數(shù)據(jù)通道,在通道啟動一個restful數(shù)據(jù)被動接收接口,解析配置文件,程序初始化操作→啟動消費(fèi)者線程→生產(chǎn)者進(jìn)程發(fā)布事件到Disruptor,從目標(biāo)源中讀取數(shù)據(jù)到Disruptor的RingBuffer環(huán)形隊(duì)列中,Disruptor消費(fèi)者將RingBuffer環(huán)形隊(duì)列[20]收集到的數(shù)據(jù)通過restful+json的方式傳到接口,接口在接收到數(shù)據(jù)后將數(shù)據(jù)發(fā)送到Kafka。由于broker的增加或者減少都會觸發(fā)Consumer Rebalance,數(shù)據(jù)通過Kafka Consumer開始處理partition里面的message,實(shí)驗(yàn)中接收源為jdbc,數(shù)據(jù)發(fā)送到指定的接收源過程中對數(shù)據(jù)進(jìn)行了清洗和裝載。由于Kafka高吞吐、異步性的特點(diǎn),可以將數(shù)據(jù)存放在Kafka服務(wù)器端,隨時處理服務(wù)器端的數(shù)據(jù)。
Disruptor讀取源數(shù)據(jù)和發(fā)送數(shù)據(jù)給Kafka broker服務(wù)器,在程序中的主要流圖如圖2所示。
圖2 Disruptor讀寫過程
Kafka原理結(jié)構(gòu)如圖3所示。
圖3 Kafka原理
圖4所示為改進(jìn)后的ETL將MYSQL數(shù)據(jù)庫中的數(shù)據(jù)遷移到Postgresql數(shù)據(jù)庫中。t_ampa_useraction分別在MYSQL不同的數(shù)據(jù)庫下,在job配置文件中配置了目標(biāo)源和目標(biāo)數(shù)據(jù)庫,啟動項(xiàng)目,開始數(shù)據(jù)遷移。實(shí)驗(yàn)中使用了4張表同時進(jìn)行,由開始時間和接入時間可見,極大地節(jié)約了數(shù)據(jù)傳輸時間。
圖4 MYSQL中數(shù)據(jù)導(dǎo)入Postgresql
圖5所示為傳統(tǒng)ETL將MYSQL數(shù)據(jù)庫中的數(shù)據(jù)遷移到Postgresql數(shù)據(jù)庫中,兩個ETL同時啟用。
圖5 傳統(tǒng)ETL MYSQL中數(shù)據(jù)導(dǎo)入Postgresql
通過實(shí)驗(yàn)對比,現(xiàn)在數(shù)據(jù)的ETL過程經(jīng)常會選擇Kafka作為消息中間件應(yīng)用在離線和實(shí)時的場景中,結(jié)合Kakfa的特性和Disruptor高并發(fā)、高吞吐的特點(diǎn),Disruptor消費(fèi)者發(fā)送數(shù)據(jù)到Kafka服務(wù)器中,實(shí)現(xiàn)數(shù)據(jù)的高效傳輸。傳統(tǒng)的ETL當(dāng)數(shù)據(jù)量上升到一定程度時,傳輸?shù)挠涗浻袝r會缺失,處理時間過長,無法實(shí)現(xiàn)并發(fā)存儲,對此做出了改進(jìn)。在實(shí)際開發(fā)過程中,因?yàn)橥瑫r對各個高校的數(shù)據(jù)進(jìn)行遷移,數(shù)據(jù)量急劇增長,因此使用Kafka高吞吐量、低延遲、異步的特性,可以極大提高數(shù)據(jù)的傳輸效率。
文中利用Kafka和Disruptor并發(fā)框架兩種數(shù)據(jù)處理技術(shù)快速構(gòu)建數(shù)據(jù)ETL通道,憑借高吞吐量、低延遲的特點(diǎn),極大節(jié)約了數(shù)據(jù)之間的傳輸時間。實(shí)驗(yàn)采用分布式消息系統(tǒng)作為大規(guī)模流數(shù)據(jù)的緩存,提高了平臺對動態(tài)流數(shù)據(jù)輸入數(shù)據(jù)量突發(fā)性變化的適應(yīng)能力。針對多種數(shù)據(jù)源如http、txt、jdbc等的處理,對傳統(tǒng)ETL進(jìn)行了改進(jìn),實(shí)現(xiàn)對大量數(shù)據(jù)的并發(fā)處理,使不同數(shù)據(jù)庫之間的數(shù)據(jù)能夠快速、同步、多樣地傳輸。
雖然對傳統(tǒng)ETL在處理速度和吞吐量方面進(jìn)行了改進(jìn),但是在排序、分頁等功能上做得還不夠完善,當(dāng)job上升到10個以上時,xml文件解析便容易出錯,對這些問題將有待進(jìn)一步完善。