于金良,朱志祥,梁小江
(1.西安郵電大學,陜西 西安 710061;2.陜西省信息化工程研究院,陜西 西安 710061)
基于Flume的MySQL數(shù)據(jù)自動收集系統(tǒng)
于金良1,朱志祥1,梁小江2
(1.西安郵電大學,陜西 西安 710061;2.陜西省信息化工程研究院,陜西 西安 710061)
針對分布式系統(tǒng)中、不同數(shù)據(jù)中心之間的數(shù)據(jù)收集,同時解決將數(shù)據(jù)由關系型數(shù)據(jù)庫交換到非關系型數(shù)據(jù)庫的問題,提出一種基于Flume的MySQL數(shù)據(jù)庫數(shù)據(jù)自動收集系統(tǒng)。為了符合現(xiàn)實中的生產(chǎn)環(huán)境,該系統(tǒng)采用的是一種星型拓撲結(jié)構(gòu)。系統(tǒng)可以自動查詢給定的MySQL數(shù)據(jù)庫表,自動檢測表中的數(shù)據(jù)更新,實現(xiàn)自動增量傳輸,并對原始數(shù)據(jù)進行封裝、解析,最終將數(shù)據(jù)存儲到非關系型數(shù)據(jù)庫HBase中。在測試中,系統(tǒng)中每臺機器的平均傳輸速度可達到1 111 kb/s,系統(tǒng)總的平均傳輸速度可以達到3 333 kb/s,并且保證了數(shù)據(jù)的完整性,實現(xiàn)了可靠高效傳輸?shù)哪繕恕?/p>
Flume;MySQL數(shù)據(jù)庫;數(shù)據(jù)收集;HBase;JDBC
近年來,隨著信息技術的快速發(fā)展,帶來的是各種信息、數(shù)據(jù)的爆發(fā)式增長,大數(shù)據(jù)時代應運而生。2008年8月,首次提出大數(shù)據(jù)的概念。在大數(shù)據(jù)時代,TB、PB,甚至EB級的數(shù)據(jù)已經(jīng)成為一種常態(tài)。為了應對大數(shù)據(jù)的存儲、處理以及大型計算機成本高的難題,集群化的分布式系統(tǒng)快速發(fā)展并取代了單機服務系統(tǒng)[1]。各大公司和開源社區(qū)紛紛提出了自己的大數(shù)據(jù)解決方案,其中開源的Hadoop生態(tài)系統(tǒng)最為熱門[2]。
Hadoop是一個并行處理大規(guī)模數(shù)據(jù)的分布式計算和存儲系統(tǒng),可以將分布式系統(tǒng)部署在廉價機器上[3]。要使用Hadoop來存儲、處理數(shù)據(jù),首先要解決的問題是如何將數(shù)據(jù)收集到Hadoop平臺上。而將原始關系型數(shù)據(jù)庫中的數(shù)據(jù)導入到Hadoop中的非關系型數(shù)據(jù)庫HBase中顯得尤為重要。文中系統(tǒng)使用Hadoop的子項目Flume收集分布式系統(tǒng)中各個計算機中數(shù)據(jù)庫的數(shù)據(jù),并最終存儲在非關系型數(shù)據(jù)庫HBase中[4]。在Hadoop生態(tài)系統(tǒng)中,Sqoop可以實現(xiàn)Hadoop集群與關系型數(shù)據(jù)之間的數(shù)據(jù)交換,但是由于它底層使用的是MapReduce計算框架,故依賴于Hadoop的集群環(huán)境,這是一大缺陷。而文中系統(tǒng)則可以脫離Hadoop環(huán)境,在構(gòu)建大數(shù)據(jù)平臺時更加靈活。
Flume最早是Cloudera開源的一個日志收集系統(tǒng),設計它的初衷是在分布式系統(tǒng)中提供可靠而有效的大規(guī)模日志收集服務[5]。2011年10月,Cloudera完成了Flume-728,對Flume進行了里程碑式的改動,重構(gòu)了核心組件、核心配置以及代碼結(jié)構(gòu),重構(gòu)后的版本統(tǒng)稱為Flume-ng。同時Cloldera將Flume貢獻給了Apache基金會,成為了頂級項目Hadoop中的一個子項目。它的架構(gòu)非常簡單靈活,尤其是當前的Flume-ng版本,它只是一個純粹的數(shù)據(jù)傳輸工具,將數(shù)據(jù)的讀入和寫出分為兩個獨立的部分,實現(xiàn)了二者的異步性。一個傳輸通道只是一個代理,各個代理之間又是相互獨立的。每個代理包括Source(數(shù)據(jù)源)、Channel(中間傳輸通道)、Sink(數(shù)據(jù)接收器)三個部分。
其中Source是將數(shù)據(jù)從數(shù)據(jù)源讀入,封裝為一個事件發(fā)送到Channel;Channel的作用是臨時緩存這些事件,為了保證數(shù)據(jù)的可靠性,當事件被Sink接收時才將其刪除,否則一直緩存;Sink負責接收事件后將數(shù)據(jù)存儲到指定的目的端,完成一次數(shù)據(jù)傳輸。
Flume作為日志收集系統(tǒng),支持多種數(shù)據(jù)源,如Exec、Spooling、Kafka、NetCat和用戶自定義的源等;擁有多種接收器,如HDFS、File Roll、HBase、Kafka、數(shù)據(jù)倉庫Hive[6]和用戶自定義的接收器等;包括多種Channel,如Memory、File等,其中File Channel具有很高的故障恢復能力。
它的使用也非常簡單,用戶根據(jù)自己的需求編寫配置文件,啟動代理服務,即可完成數(shù)據(jù)收集。
因為Flume是一個開源項目,所以用戶可以在原有的架構(gòu)上自己定制,實現(xiàn)自己的數(shù)據(jù)收集系統(tǒng)。
2.1 系統(tǒng)介紹
該系統(tǒng)是基于Flume-ng 1.6.0的MySQL數(shù)據(jù)庫數(shù)據(jù)收集系統(tǒng),收集MySQL中的數(shù)據(jù),自動監(jiān)測數(shù)據(jù)庫中的數(shù)據(jù)更新,實現(xiàn)實時增量收集,最終將數(shù)據(jù)存儲到非關系型數(shù)據(jù)庫HBase中??蓪⒃痉荋adoop集群中的數(shù)據(jù)導入到集群中,實現(xiàn)單機系統(tǒng)與分布式系統(tǒng)的數(shù)據(jù)交換。可在脫離Hadoop環(huán)境的前提下將數(shù)據(jù)導入到Hadoop集群,具有依賴小、量級輕的優(yōu)點。
系統(tǒng)需要實現(xiàn)的目標:
(1)可以收集MySQL數(shù)據(jù)庫中的原始數(shù)據(jù);
(2)自動檢測數(shù)據(jù)庫中數(shù)據(jù)的更新,只收集變化的數(shù)據(jù),實現(xiàn)數(shù)據(jù)的實時增量收集;
(3)將收集到的數(shù)據(jù)存儲到非關系型數(shù)據(jù)庫HBase中。
2.2 系統(tǒng)設計
在Flume的運行過程中根據(jù)設定會運行一個或者多個代理,由每一個代理完成數(shù)據(jù)的收集服務。每個代理都是一個進程,它們是相互獨立的,因此可以實現(xiàn)同時對多個數(shù)據(jù)源進行并行處理,以達到從分布式系統(tǒng)中的不同計算機上收集數(shù)據(jù)的目的。代理在運行的過程中,并不依賴于Hadoop環(huán)境,這使得將非Hadoop集群上的數(shù)據(jù)交換到集群中變得可行[7]。系統(tǒng)架構(gòu)如圖1所示。
圖1 系統(tǒng)架構(gòu)圖
為安裝有MySQL數(shù)據(jù)源的每臺機器部署一個Flume的代理,將Source配置為每臺機器的MySQL數(shù)據(jù)源,URL指定為連接MySQL數(shù)據(jù)庫的連接符,形如jdbc:mysql://IP:port/databasename;Sink配置為要將數(shù)據(jù)交換到Hadoop集群中的HBase數(shù)據(jù)庫。啟動每個機器上的代理,向集群發(fā)送數(shù)據(jù)。
代理要完成的主要工作是:
(1)檢測指定計算機上MySQL數(shù)據(jù)庫表中的數(shù)據(jù),實時檢測表中數(shù)據(jù)的更新,并收集、預處理更新數(shù)據(jù),預處理完成后記錄當前數(shù)據(jù)行數(shù),防止重復處理;
(2)將收集的數(shù)據(jù)通過HBase客戶端插入到HBase數(shù)據(jù)庫指定的數(shù)據(jù)表中。
在代理內(nèi)部,按數(shù)據(jù)的流向又可分為Source、Channel、Sink三個組件,分別完成不同的任務。Source組件的任務是查詢MySQL數(shù)據(jù)庫表中的數(shù)據(jù),檢查數(shù)據(jù)更新,預處理更新的數(shù)據(jù),且記錄處理數(shù)據(jù)的位置信息(即數(shù)據(jù)在表中所在的行數(shù)),將其保存在一個指定的文件中,并將數(shù)據(jù)封裝為事件發(fā)送到Channel中;Channel只負責緩存Source經(jīng)過處理后發(fā)送來的事件,等待Sink抽取事件,抽取完成后,該事件自動刪除;Sink負責抽取Channel中的緩存事件,并進行解析,最終將解析完成的數(shù)據(jù)存入到HBase數(shù)據(jù)庫中。該系統(tǒng)代理架構(gòu)如圖2所示。
圖2 系統(tǒng)代理架構(gòu)圖
2.3 主要組件的設計與實現(xiàn)
代理內(nèi)部各組件數(shù)據(jù)處理流程如圖3所示。
Source使用JDBC技術查詢數(shù)據(jù)庫表中的數(shù)據(jù),執(zhí)行查詢時采用分頁查詢的方法,所謂分頁查詢就是在數(shù)據(jù)量過大時分多次對數(shù)據(jù)庫進行查詢[8]。而MySQL數(shù)據(jù)庫的分頁查詢是通過調(diào)用LIMIT函數(shù)實現(xiàn)的[9],在SQL語句的結(jié)尾處指定每次查詢的行數(shù)。使查詢一次數(shù)據(jù)庫加載到內(nèi)存里的數(shù)據(jù)量減小,從而在總數(shù)據(jù)量變大時減輕內(nèi)存的壓力。對數(shù)據(jù)庫的查詢是循環(huán)執(zhí)行的,以達到實時監(jiān)控數(shù)據(jù)庫更新的目的。當檢測到有數(shù)據(jù)時(第一次查詢原始數(shù)據(jù)或者更新的數(shù)據(jù)),將這些數(shù)據(jù)從數(shù)據(jù)庫讀入到代理中,通過預處理將其封裝為一個事件,然后更新已處理完成數(shù)據(jù)的位置信息(即數(shù)據(jù)庫中數(shù)據(jù)的行數(shù)),并和數(shù)據(jù)庫連接符拼接在一起保存到指定的記錄文件中。在每次對此數(shù)據(jù)庫表進行查詢時先讀取此文件,獲得這個位置信息,查詢時從此行開始,之前的數(shù)據(jù)不再查詢,從而避免了重復查詢處理,也解決了使用內(nèi)存通道時不能自動故障恢復的缺陷[10]。當系統(tǒng)發(fā)生故障,導致代理宕機時,只需要重新啟動代理,從文件中讀取這個行數(shù)即可從上次處理的位置繼續(xù)處理,使代理具有很好的可恢復性。下面是Source組件的部分源代碼。
圖3 代理內(nèi)部各組件數(shù)據(jù)處理流程圖
public class MySQLSource extends AbstractSource implements Configurable, PollableSource[11]{
private JdbcHelper jdbcHelper; //通過JDBC查詢數(shù)據(jù)庫的輔助類
@Override
public Status process() throwsEventDeliveryException {
try{
sqlSourceCounter.startProcess();
//取得查詢數(shù)據(jù)得到的結(jié)果
List> result=jdbcHelper.executeQuery();
//判斷是否取得數(shù)據(jù)
if(!result.isEmpty()) {
//將數(shù)據(jù)發(fā)到ChannelcsvWriter.writeAll(sqlSourceHelper.getAllRows(result));
//處理完后,更新查詢行數(shù)sqlSourceCounter.incrementEventCount(result.size());
//更新記錄文件的數(shù)據(jù)即當前查詢數(shù)據(jù)表中的行數(shù)
sqlSourceHelper.updateStatusFile();
}
sqlSourceCounter.endProcess(result.size());
if(result.size() { Thread.sleep(sqlSourceHelper.getRunQueryDelay()); } //返回READY,執(zhí)行下一次查詢 return Status.READY; } 使用JDBC輔助查詢數(shù)據(jù)庫的部分代碼: public classJdbcHelper{ public List List //對數(shù)據(jù)庫實行分頁查詢,減小內(nèi)存的壓力,提高查詢效率 resultSet=connection.createStatement().executeQuery( (sqlSourceHelper.getQuery())+"LIMIT"+sqlSourceHelper.getCurrentIndex()+","+sqlSourceHelper.getMaxRows()));); //將查詢的數(shù)據(jù)緩存到rowsList中 while(resultSet.next()){ List for(int i=0;i } rowsList.add(list); } //更新新的查詢起始位置 sqlSourceHelper.setCurrentIndex(sqlSourceHelper.getCurrentIndex()+rowsList.size()); //返回查詢結(jié)果 returnrowsList; } } Channel的主要作用是緩存Source預處理的數(shù)據(jù),即事件。只有當事件被Sink抽取后才會從當前Channel中刪除。這種機制保證了數(shù)據(jù)傳遞的可靠性。系統(tǒng)采用內(nèi)存通道,將數(shù)據(jù)直接緩存在內(nèi)存中,優(yōu)點是數(shù)據(jù)傳輸速度快,減少數(shù)據(jù)由讀入端到寫出端的時延。 Sink主要負責從Channel抽取事件,由于事件包括一個事件頭和事件體,事件體中存儲的才是真正從MySQL中讀取的數(shù)據(jù),故先將事件體讀出,再跟據(jù)在配置文件中設定好的規(guī)則解析這些數(shù)據(jù),將數(shù)據(jù)分解為與原始字段和值一一對應的形式,最后把這些數(shù)據(jù)通過調(diào)用HBase的Thrift客戶端接口插入到指定的HBase表中[12]。下面是插入到HBase中部分代碼: public class HBaseSink extends AbstractSink implements Configurable{ //開啟HBaseSink public void start(){ try{ //先建立通過Thrift連接到Hbase服務器[13] client=privilegedExecutor.execute(new PrivilegedExceptionAction @Override public Hbase.Client run() throws Exception { socket=new TSocket(thriftQum,Integer.parseInt(port)); TProtocol protocol=new TBinaryProtocol(socket, true, true); client=new Hbase.Client(protocol); //打開Thrift連接 socket.open(); return client; } }); }catch(Exception e){ //TODO Auto-generated catch block e.printStackTrace(); sinkCounter.incrementConnectionFailedCount(); logger.error("Could not connect hbase"+thriftQum,e); } //實現(xiàn)接口的方法 @Override public Status process() throws EventDeliveryException { //初始化解析數(shù)據(jù)對象 serializer.initialize(event,columnFamily); //拿到解析后的數(shù)據(jù),并存放在actions中 actions.add(serializer.getActions()); //調(diào)用數(shù)據(jù)傳遞給插入Hbase的方法 putEventsAndCommit(actions,txn); } //真正將數(shù)據(jù)插入到HBase中的方法 private void putEventsAndCommit(final List privilegedExecutor.execute(new PrivilegedExceptionAction @Override public Void run() throws Exception { Map //將數(shù)據(jù)插入到Hbase中名為tableName的表中 client.mutateRows(wrap(tableName),actions,attributes); return null; } } 下面是Sink解析數(shù)據(jù)的部分代碼: public class MyHbaseEventSerializer implements HbaseEventSerializer { //配置文件中的一些屬性: //分隔數(shù)據(jù)的字符,默認為空格 public static final String SPLIT_CONFIG="splitChar"; public static final String SPLIT_DEFAULT=""; //Hbase表中字段的名字 public static final String COL_NAME_CONFIG="colNames"; //數(shù)據(jù)的編碼方式,默認為UTF-8 public static final String CHARSET_CONFIG ="charset"; public static final String CHARSET_DEFAULT="UTF-8"; //插入Hbase表中的列族 protected byte[]cf; public void configure(Context context) { //獲得配置文件中的屬性值 String splitChar=context.getString(SPLIT_CONFIG,SPLIT_DEFAULT); inputPattern=Pattern.compile(splitChar); charset=Charset.forName(context.getString(CHARSET_CONFIG,CHARSET_DEFAULT)); String colNameStr=context.getString(COL_NAME_CONFIG,COLUMN_NAME_DEFAULT); String[]columnNames=colNameStr.split(","); for(String s:columnNames) { colNames.add(s.getBytes(charset)); } } //從Channel中緩存的事件對象中獲得MySQL中的數(shù)據(jù) @Override public void initialize(Event event,byte[] columnFamily) { //從事件(包括header和數(shù)據(jù))中得到其中的數(shù)據(jù) this.payload=event.getBody(); //獲取列族 this.cf=columnFamily; } public BatchMutation getActions() throws FlumeException { BatchMutation bm=null; List //按約定的規(guī)則分隔數(shù)據(jù) String[] data=inputPattern.split(new String(payload,charset)); //以系統(tǒng)當前時間的納秒數(shù)為行鍵 String rowKey=String.valueOf(System.nanoTime()); Map try{ fieldName=new HashMap for(int i=0;i String column=cf+":" +colNames.get(i); //將數(shù)據(jù)插入到HBase中 mutations.add(new Mutation(false,wrap(column),wrap(data[i]),false)); } bm=new BatchMutation(wrap(rowKey), mutations); } catch (Exception e) {throw newFlumeException("Could not get row key!",e); } //返回要插入的數(shù)據(jù) return bm; } Flume配置文件中信息: #配置Sources agent.sources=s agent.sources.s.type=org.xy.flume.source.MySQLSource agent.sources.s.connection.url=jdbc:mysql://IP:port/database agent.sources.s.user=username agent.sources.s.password=password agent.sources.s.table=tablename agent.sources.s.jdbc.connection.driver_class=com.mysql.jdbc.Driver agent.sources.s.custom.query=SELECT * FROM tablename agent.sources.s.channels=c #配置Channel agent.channels=c agent.channels.c.type=memory agent.channels.c.capacity=10 000 agent.channels.c.transactionCapacity=1 000 #配置sinks agent.sinks.k.type=hbase agent.sinks.k.channel=c agent.sinks.k.table=hbaseTablename agent.sinks.k.columnFamily=columnFamilyName agent.sinks.k.serializer=org.apache.flume.sink.hbase.MyHbaseEventSerializer agent.sinks.k.serializer.colNames=columns 2.4 測試與結(jié)果分析 測試系統(tǒng)使用的環(huán)境由四臺機器組成的集群,其中安裝MySQL數(shù)據(jù)庫作為數(shù)據(jù)源的四臺相同配置,HBase集群中的HBase Master是一臺服務器。測試中使用的HBase版本為0.98,F(xiàn)lume版本為1.6.0。 為了最大限度地接近實際生產(chǎn)環(huán)境,使用的測試數(shù)據(jù)為1 700 000余條,總大小為320 M,分別在每個數(shù)據(jù)庫中存一份。三個Flume代理運行在服務器上,從三臺機器的數(shù)據(jù)庫中收集數(shù)據(jù),測試結(jié)果見表1。 表1 測試數(shù)據(jù) 由表1得到測試每個代理收集數(shù)據(jù)的平均時間為318 s,平均速度為1 007 kb/s,系統(tǒng)總速度為3 021 kb/s,且不會丟失數(shù)據(jù),在網(wǎng)絡暢通的情況下數(shù)據(jù)可靠性強。后期研究發(fā)現(xiàn)在通過JDBC查詢數(shù)據(jù)庫時,對于大量數(shù)據(jù),使用分頁查詢可以提高查詢效率,并且解決了隨著數(shù)據(jù)庫數(shù)據(jù)量增大而引起的Java虛擬機內(nèi)存溢出的問題。改變分頁大小,測試系統(tǒng)得到測試結(jié)果如圖4所示。 由圖4可以看出,當分頁為10 000(這個值與具體的數(shù)據(jù)總量有關)時,傳輸?shù)乃俣茸羁臁4讼到y(tǒng)可在5 min左右將510 000 000多萬條(大約9 600 MB)的數(shù)據(jù)收集完畢,并且沒有數(shù)據(jù)丟失,兼顧了數(shù)據(jù)傳輸?shù)男屎涂煽啃裕_到了預期的目標。 圖4 傳輸速度隨分頁大小變化的曲線 系統(tǒng)通過使用開源的工具Flume-ng 1.6.0實現(xiàn)了MySQL數(shù)據(jù)庫數(shù)據(jù)收集系統(tǒng),主要實現(xiàn)了其中的Source和Sink組件,完成了基于JDBC的自動查詢、檢測數(shù)據(jù)庫更新和解析數(shù)據(jù)寫入HBase的功能。豐富了Flume的功能,使得Flume可以在脫離Hadoop環(huán)境下進行數(shù)據(jù)傳輸。測試結(jié)果表明,系統(tǒng)可以高效可靠地收集數(shù)據(jù)庫中的數(shù)據(jù)。 [1] 孫韓林.一種基于云計算的網(wǎng)絡流量分析系統(tǒng)結(jié)構(gòu)[J].西安郵電大學學報,2013,18(4):75-79. [2] 李 芬,朱志祥,劉盛輝.大數(shù)據(jù)發(fā)展現(xiàn)狀及面臨的問題[J].西安郵電大學學報,2013,18(5):100-103. [3] Apache Hadoop[EB/OL]. 2015. http://hadoop.apache.org/. [4] 詹 玲,馬 駿,陳伯江,等.分布式I/O日志收集系統(tǒng)的設計與實現(xiàn)[J].計算機工程與應用,2010,46(36):88-90. [5] Apache Flume[EB/OL].2015.http://flume.apache.org/. [6] 王春梅.基于數(shù)據(jù)倉庫的數(shù)據(jù)挖掘技術[J].西安郵電學院學報,2006,11(5):99-102. [7] 王 博,陳莉君.Hadoop遠程過程調(diào)用機制的分析和應用[J].西安郵電學院學報,2012,17(6):74-77. [8] 孫 輝.MySQL查詢優(yōu)化的研究和改進[D].武漢:華中科技大學,2007. [9] 李現(xiàn)艷,趙書俊,初元萍.基于MySQL的數(shù)據(jù)庫服務器性能測試[J].核電子學與探測技術,2011,31(1):48-52. [10] 謝曉燕,張靜雯.一種基于Linux集群技術的負載均衡方法[J].西安郵電大學學報,2014,19(3):64-68. [11] Apache HBase[EB/OL].2015.http://hbase.apache.org/. [12] 楊寒冰,趙 龍,賈金原.HBase數(shù)據(jù)庫遷移工具的設計與實現(xiàn)[J].計算機科學與探索,2013,7(3):236-246. [13] Carstoiu D,Lepadatu E,Gaspar M.Hbase-non SQL database,performances evaluation[J].International Journal of Advancements in Computing Technology,2010,2(5):42-52. Automatic Collection System for MySQL Data Based on Flume YU Jin-liang1,ZHU Zhi-xiang1,LIANG Xiao-jiang2 (1.Xi’an University of Posts and Telecommunications,Xi’an 710061,China;2.Shaanxi Information Engineering Research Institute,Xi’an 710061,China) For data collecting in distributed systems or between different data centers,while addressing the issue of exchange data from a relational database to non-relational databases,an automatic collecting system for MySQL data based on Flume is put forword.In order to meet the real-world production environment,this system uses a star topology.It can automatically query a given MySQL database table,automatic delection of the data updating of the table,automatic incremental transfer,packaging and parsing to the original data,finally storing data into a database of HBase.In test,the average speed of transmission of every machine in the system can reach 1 111 kb/s,and the total speed of transmission can reach 3 333 kb/s,which ensure data integrity and achieve the goal of reliable and efficient transport. Flume;MySQL database;data collecting;HBase;JDBC 2016-01-11 2016-05-05 時間:2016-11-21 2015年工信部通信軟科學研究項目(2015-R-19);2015陜西省信息化技術研究項目(2015-002) 于金良(1991-),男,碩士研究生,研究方向為大數(shù)據(jù)分析處理;朱志祥,教授,研究方向為計算機網(wǎng)絡、信息化應用和網(wǎng)絡安全。 http://www.cnki.net/kcms/detail/61.1450.TP.20161121.1641.022.html TP274.2 A 1673-629X(2016)12-0137-05 10.3969/j.issn.1673-629X.2016.12.030>executeQuery() throws SQLException{
> rowsList=new ArrayList
>();
3 結(jié)束語