鄧杰 童孟軍 胡文澤 林英杰 胡燚
摘? 要: Mysql數(shù)據(jù)庫(kù)表切換過(guò)程,需要保證兩個(gè)相同表同時(shí)寫(xiě)入。Solr作為企業(yè)級(jí)的搜索框架,其自帶的索引更新系統(tǒng)是增量更新,實(shí)時(shí)性差。為了滿足互聯(lián)網(wǎng)企業(yè)對(duì)數(shù)據(jù)的實(shí)時(shí)性要求,設(shè)計(jì)和實(shí)現(xiàn)了一款基于Zookeeper的能夠準(zhǔn)實(shí)時(shí)更新Solr索引和實(shí)現(xiàn)數(shù)據(jù)庫(kù)雙寫(xiě)的系統(tǒng)。用戶(hù)需要提交配置信息到系統(tǒng),當(dāng)Mysql數(shù)據(jù)源發(fā)生內(nèi)容變更時(shí),系統(tǒng)就能夠?qū)崟r(shí)捕獲,將變更內(nèi)容經(jīng)過(guò)數(shù)據(jù)轉(zhuǎn)化并實(shí)時(shí)同步更新Solr索引。
關(guān)鍵詞: Zookeeper; Solr; Mysql; 實(shí)時(shí)同步; 分布式
中圖分類(lèi)號(hào):TP392? ? ? ? ? 文獻(xiàn)標(biāo)識(shí)碼:? ? ?文章編號(hào):1006-8228(2020)02-58-04
Quasi–real-time index update system using Zookeeper and its monitoring
Deng Jie, Tong Mengjun, Hu Wenze, Lin Yingjie, Hu Yi
(College of Information Engineering, Zhejiang A&F University, Hangzhou, Zhejiang, 311300, China)
Abstract: Mysql database table switching process, it is needed to ensure that two same tables are written at the same time. As an enterprise-class search framework, Solr's built-in index update system is incremental update with poor real-time performance. In order to meet the real-time requirements of Internet enterprises, a system is designed and implemented by using Zookeeper, which can quasi–real-timely update Solr index and realize database dual write. Users need to submit configuration information to the system. When the content of Mysql data source changes, the system can capture the changed content in real time, and update Solr index synchronously in real time through the data transformation.
Key words: Zookeeper; Solr; Mysql; real-time; synchronization
0 引言
當(dāng)前互聯(lián)網(wǎng)企業(yè)對(duì)數(shù)據(jù)的實(shí)時(shí)性要求越來(lái)越高。所以本文研究的內(nèi)容著眼于實(shí)時(shí)的數(shù)據(jù)同步,整個(gè)系統(tǒng)分為數(shù)據(jù)抓取分發(fā)平臺(tái)部分和索引更新部分。基于這個(gè)系統(tǒng)來(lái)解決下面兩方面的問(wèn)題。
⑴ 很多公司會(huì)將部分?jǐn)?shù)據(jù)冗余存在搜索平臺(tái)Solr上,當(dāng)用戶(hù)請(qǐng)求數(shù)據(jù)時(shí),直接從搜索平臺(tái)Solr上獲取而不是訪問(wèn)數(shù)據(jù)庫(kù)(因?yàn)閿?shù)據(jù)庫(kù)不支持復(fù)雜的搜索邏輯)。所以需要一款能夠?qū)崟r(shí)更新搜索索引的系統(tǒng),當(dāng)數(shù)據(jù)庫(kù)內(nèi)容發(fā)生變更時(shí),及時(shí)更新對(duì)應(yīng)的Solr上的索引。
⑵ 有些特殊情況需要替換線上運(yùn)行的數(shù)據(jù)庫(kù)mysql內(nèi)的某個(gè)表,但又因?yàn)槭蔷€上運(yùn)行的數(shù)據(jù)庫(kù),不應(yīng)該讓業(yè)務(wù)感知到這種變更,所以需要一個(gè)系統(tǒng)進(jìn)行漸進(jìn)的替換過(guò)程。
1 核心框架和相關(guān)技術(shù)介紹
1.1 系統(tǒng)運(yùn)用到的核心框架
zookeeper提供分布式協(xié)調(diào)服務(wù),提供諸如統(tǒng)一命名服務(wù)、配置管理和分布式鎖、分布式消息等分布式的基礎(chǔ)服務(wù)。它是一個(gè)典型的分布式數(shù)據(jù)一致性的解決方案,分布式應(yīng)用可以基于zookeeper實(shí)現(xiàn)發(fā)布/訂閱、負(fù)載均衡、命名服務(wù)、分布式協(xié)調(diào)/通知、集群管理/master選舉、分布式鎖和分布式隊(duì)列等功能[1]。功能強(qiáng)大又好用。
kafka是一款基于發(fā)布訂閱模型的消息隊(duì)列。它是一個(gè)能夠提供實(shí)時(shí)數(shù)據(jù)傳輸?shù)钠脚_(tái),具有高吞吐、低延遲的特點(diǎn)。使用它的原因:一是可以處理更多的消息,不受單臺(tái)服務(wù)器的限制,二是分區(qū)可作為并行處理的單元。這樣使所有的請(qǐng)求給多臺(tái)服務(wù)器處理[2]。
Kafka Connect是一種用于Kafka和其他數(shù)據(jù)系統(tǒng)之間進(jìn)行數(shù)據(jù)傳輸?shù)墓ぞ?。僅關(guān)注數(shù)據(jù)的復(fù)制,不處理其他任務(wù),對(duì)數(shù)據(jù)的傳輸進(jìn)行管理和監(jiān)控。
Debezium是Kafka Connect的一種實(shí)現(xiàn),主要用于數(shù)據(jù)庫(kù)和kafka之間的數(shù)據(jù)傳輸。它是一個(gè)CDC(Change Data Capture)系統(tǒng),能實(shí)時(shí)捕獲上游數(shù)據(jù)的變動(dòng)。然后記錄到一個(gè)或者多個(gè)Kafka topic。
solr是一款開(kāi)源的企業(yè)級(jí)搜索框架,其主要功能有全文檢索,分詞,拼音檢索等。通過(guò)solr索引,能夠在短時(shí)間從海量數(shù)據(jù)里得到用戶(hù)關(guān)心的數(shù)據(jù)。
1.2 系統(tǒng)運(yùn)用到的主要技術(shù)
⑴ Spring技術(shù)。Spring是現(xiàn)在非常流行的一個(gè)框架。SpringMVC是Spring的一個(gè)模塊,它提供完整的MVC模型解決方案[3]。本系統(tǒng)使用REST接口來(lái)管理同步任務(wù),通過(guò)使用Spring和SpringMVC能夠很方便、快速的搭建一個(gè)后端應(yīng)用,簡(jiǎn)化開(kāi)發(fā)流程[4]。
⑵ Java多線程技術(shù)。只有一個(gè)線程的系統(tǒng),運(yùn)行效率必定糟糕,運(yùn)用到多線程技術(shù)后可以同時(shí)運(yùn)行不同的同步任務(wù),大大的提高了運(yùn)行的效率[5]。
⑶ 分布式技術(shù)。本系統(tǒng)基于搶占式任務(wù)調(diào)度方式,保證同步任務(wù)的高可用,可以在不同機(jī)器運(yùn)行。
2 系統(tǒng)實(shí)現(xiàn)
2.1 實(shí)時(shí)數(shù)據(jù)管道
實(shí)時(shí)數(shù)據(jù)管道主要數(shù)據(jù)來(lái)自mysql。當(dāng)數(shù)據(jù)庫(kù)信息發(fā)生變更時(shí),mysql內(nèi)變更前和變更后的內(nèi)容會(huì)被發(fā)送到實(shí)時(shí)數(shù)據(jù)管道上,使各個(gè)接入方在極短時(shí)間內(nèi)收到數(shù)據(jù)庫(kù)變更信息。
實(shí)時(shí)數(shù)據(jù)管道由3個(gè)核心部分組成:CDC模塊、系統(tǒng)中間體Kafka和Schema registry。
圖1是一個(gè)簡(jiǎn)單的架構(gòu)圖,為了展示用戶(hù)如何通過(guò)管道得到數(shù)據(jù)變更。Debezium實(shí)時(shí)的抓取到mysql的數(shù)據(jù)變更并使用Avro將數(shù)據(jù)序列化得到schema和值,其中schema被提交到schema registry并返回id,之后id和序列化后的值一起發(fā)送到kafka。數(shù)據(jù)管道下游邏輯由開(kāi)發(fā)人員實(shí)現(xiàn):初始化一個(gè)KafkaConsumer,訂閱指定的Kafka Topic,根據(jù)拿到的數(shù)據(jù)schema id,從schema registry里獲取到對(duì)應(yīng)的schema,然后使用Avro將拉取到的數(shù)據(jù)的值和schema反序列化成一條消息交付給用戶(hù)。本系統(tǒng)中的實(shí)時(shí)數(shù)據(jù)管道部分還增加了基于組件JMX信息的監(jiān)控,用來(lái)實(shí)時(shí)觀察數(shù)據(jù)管道的狀況。
2.2 Solr索引和Mysql數(shù)據(jù)的同步系統(tǒng)SIS
SIS目的是解決Solr索引實(shí)時(shí)更新和數(shù)據(jù)庫(kù)雙寫(xiě)的問(wèn)題。由服務(wù)端和客戶(hù)端兩部分組成。用戶(hù)向SIS服務(wù)端提交任務(wù),SIS客戶(hù)端從SIS服務(wù)端監(jiān)聽(tīng)到新任務(wù)后,啟動(dòng)任務(wù)。
⑴ SIS服務(wù)端的實(shí)現(xiàn)
SIS服務(wù)端有提交同步任務(wù)、刪除同步任務(wù)、更新同步任務(wù)三個(gè)功能。
用戶(hù)向服務(wù)端提交任務(wù),服務(wù)端創(chuàng)建/sis/task和/taskX節(jié)點(diǎn)。/sis/task節(jié)點(diǎn)為SIS同步任務(wù)根節(jié)點(diǎn),/sis/task/taskX表示一個(gè)同步任務(wù)節(jié)點(diǎn),其配置信息都會(huì)保存在自己內(nèi)部。
新任務(wù)提交的過(guò)程的概括如下:首先SIS服務(wù)端啟動(dòng)時(shí),首先會(huì)嘗試向zookeeper注冊(cè)/sis/task持久節(jié)點(diǎn)。然后用戶(hù)提交一份同步任務(wù)的配置內(nèi)容到SIS服務(wù)端。SIS服務(wù)端收到任務(wù)創(chuàng)建請(qǐng)求后,向zk創(chuàng)建/sis/task/taskX 持久節(jié)點(diǎn),其中taskX為同步任務(wù)的名稱(chēng)。同時(shí),同步任務(wù)的配置信息會(huì)被保存到/sis/task/taskX節(jié)點(diǎn)內(nèi)。Zookeeper提供的分布式協(xié)調(diào)功能對(duì)同一個(gè)節(jié)點(diǎn)的多個(gè)創(chuàng)建請(qǐng)求,只會(huì)有一個(gè)請(qǐng)求能成功,這也保證了不會(huì)有多個(gè)相同任務(wù)被創(chuàng)建。
⑵ SIS服務(wù)端的設(shè)計(jì)
SIS客戶(hù)端的設(shè)計(jì)圍繞zookeepe展開(kāi),它負(fù)責(zé)同步任務(wù)組件創(chuàng)建,調(diào)度器初始化等工作。這里的實(shí)現(xiàn)非常復(fù)雜,在這里簡(jiǎn)單的闡述客戶(hù)端任務(wù)的情況。
圖2是SIS客戶(hù)端創(chuàng)建任務(wù)工作流程。其中Client表示SIS服務(wù)端,它們以集群的形式運(yùn)行,每一個(gè)Client都是對(duì)等的。
⑴ 每個(gè)SIS客戶(hù)端啟動(dòng)時(shí),會(huì)向/sis/task節(jié)點(diǎn)注冊(cè)監(jiān)聽(tīng)器,監(jiān)聽(tīng)該節(jié)點(diǎn)子節(jié)點(diǎn)變化情況。當(dāng)/sis/task子節(jié)點(diǎn)增加,刪除,內(nèi)容更新時(shí)SIS客戶(hù)端會(huì)收到通知。
⑵ SIS服務(wù)端根據(jù)用戶(hù)創(chuàng)建任務(wù)請(qǐng)求,創(chuàng)建/sis/task/taskX 任務(wù)節(jié)點(diǎn),其中taskX為同步任務(wù)的名稱(chēng),它是一個(gè)持久節(jié)點(diǎn)。
⑶ 所有SIS客戶(hù)端都會(huì)得到節(jié)點(diǎn)/sis/task/taskX被創(chuàng)建的消息。SIS客戶(hù)端收到回調(diào)之后,都會(huì)向/sis/task/taskX節(jié)點(diǎn)注冊(cè)監(jiān)聽(tīng)器。Zookeeper保證只會(huì)有一個(gè)客戶(hù)端請(qǐng)求成功,開(kāi)始任務(wù)同時(shí)將同步任務(wù)的運(yùn)行狀態(tài)寫(xiě)入到lock節(jié)點(diǎn)內(nèi)部。
⑷ 如果此時(shí)運(yùn)行同步任務(wù)taskX的服務(wù)器發(fā)生宕機(jī), 那么SIS和zookeeper的連接將會(huì)斷開(kāi),并且lock臨時(shí)節(jié)點(diǎn)將自動(dòng)刪除。剩余的SIS客戶(hù)端由于添加了對(duì)同步任務(wù)的監(jiān)聽(tīng)器會(huì)收到同步任務(wù)中斷的通知,又開(kāi)始對(duì)/sis/task/taskX任務(wù)節(jié)點(diǎn)加鎖。加鎖成功的SIS客戶(hù)端,從任務(wù)節(jié)點(diǎn)讀取配置信息,重新啟動(dòng)同步任務(wù)?;谶@個(gè)機(jī)制實(shí)現(xiàn)SIS同步任務(wù)的高可用。
⑸ 如果用戶(hù)主動(dòng)提交刪除任務(wù)請(qǐng)求,那么SIS服務(wù)端首先將/sis/task/taskX/lock鎖節(jié)點(diǎn)的狀態(tài)信息更新為WAIT_FOR_CLOSE,這表示該任務(wù)節(jié)點(diǎn)等待刪除,隨后刪除鎖節(jié)點(diǎn)和/sis/task/taskX任務(wù)節(jié)點(diǎn),之后所有在/sis/task/taskX任務(wù)節(jié)點(diǎn)注冊(cè)監(jiān)聽(tīng)器的SIS客戶(hù)端都會(huì)收到鎖節(jié)點(diǎn)被用戶(hù)主動(dòng)刪除的通知,但都不做任何響應(yīng)。任務(wù)節(jié)點(diǎn)刪除之后,所有向/sis/task節(jié)點(diǎn)注冊(cè)監(jiān)聽(tīng)器的SIS客戶(hù)端收到任務(wù)節(jié)點(diǎn)被刪除的通知,SIS客戶(hù)端根據(jù)通知內(nèi)容判斷同步任務(wù)是否運(yùn)行在自己所在服務(wù)器來(lái)同步任務(wù)和清理資源。
⑹ 如果用戶(hù)主動(dòng)提交更新同步任務(wù)配置信息請(qǐng)求,那么SIS服務(wù)端會(huì)更新/sis/task/taskX的節(jié)點(diǎn)內(nèi)容。隨后所有在/sis/task節(jié)點(diǎn)注冊(cè)監(jiān)聽(tīng)器的SIS客戶(hù)端都會(huì)收到通知,并根據(jù)通知獲取到具體哪個(gè)任務(wù)節(jié)點(diǎn)需要更新,隨后更新/sis/task/taskX/lock鎖節(jié)點(diǎn)狀態(tài)為NEED_UPDATE。之后所有在/sis/task/taskX節(jié)點(diǎn)注冊(cè)監(jiān)聽(tīng)器的SIS客戶(hù)端收都會(huì)收到鎖節(jié)點(diǎn)內(nèi)容被更新的通知,并判斷對(duì)應(yīng)的同步任務(wù)是否在自己所在的服務(wù)器,如果是則再次判斷任務(wù)狀態(tài),如果為NEED_UPDATE,那么就停止老的同步任務(wù),清理資源,刪除同步任務(wù)下的鎖節(jié)點(diǎn)。鎖節(jié)點(diǎn)被刪除后,和第⑷步類(lèi)似,創(chuàng)建新的同步任務(wù)。
上述六個(gè)步驟概述了SIS客戶(hù)端對(duì)任務(wù)的調(diào)度過(guò)程,基于zookeeper的SIS客戶(hù)端和服務(wù)端的實(shí)現(xiàn),讓SIS同步任務(wù)能夠高可用,即使某一臺(tái)服務(wù)器宕機(jī),同步任務(wù)也不會(huì)中斷。
2.3 實(shí)時(shí)數(shù)據(jù)管道和SIS
實(shí)時(shí)數(shù)據(jù)管道可以應(yīng)用于以下的場(chǎng)景:數(shù)值統(tǒng)計(jì)、實(shí)時(shí)數(shù)據(jù)分析、響應(yīng)式編程。實(shí)時(shí)數(shù)據(jù)管道能夠讓開(kāi)發(fā)人員實(shí)現(xiàn)實(shí)時(shí)ETL(Extract-Transform-Load),提供實(shí)時(shí)、無(wú)限的數(shù)據(jù)流。
SIS可以解決solr索引更新延時(shí)大的缺點(diǎn),實(shí)現(xiàn)索引的實(shí)時(shí)更新。并且還能夠完美解決Mysql雙寫(xiě)需要開(kāi)發(fā)人員在項(xiàng)目代碼里添加額外代碼,實(shí)現(xiàn)數(shù)據(jù)寫(xiě)入兩個(gè)庫(kù)的問(wèn)題。
如下是數(shù)據(jù)管道和SIS相結(jié)合實(shí)現(xiàn)mysql雙寫(xiě)。
現(xiàn)在在同一個(gè)數(shù)據(jù)庫(kù)里有2張表,分別是userinfo和test表。本系統(tǒng)將實(shí)現(xiàn):當(dāng)userinfo表有內(nèi)容變更時(shí),test表能立刻同步。用戶(hù)提交配置內(nèi)容到同步服務(wù)器,指定userinfo表的變更需要被同步到test表,點(diǎn)擊create創(chuàng)建數(shù)據(jù)庫(kù)雙寫(xiě)同步任務(wù)。
提交的配置信息如圖3,指定SIS同步的數(shù)據(jù)來(lái)自數(shù)據(jù)管道userinfo相關(guān)的topic。中間有多個(gè)處理過(guò)程包括數(shù)據(jù)的冗余,轉(zhuǎn)換。最后數(shù)據(jù)會(huì)被寫(xiě)入mysql的test表。
從圖4可以看到,Userinfo表里userid為1的數(shù)據(jù)變更前timestamp字段的值為null,當(dāng)行內(nèi)容有更新時(shí),timestamp值會(huì)被自動(dòng)更新為內(nèi)容更新的時(shí)間。
更新userinfo表userid為1的行數(shù)據(jù),將username更新為name111,如圖5所示。timestamp字段的值被更新為此行內(nèi)容變更時(shí)的時(shí)間2018-05-15 23:37:03。因?yàn)橹皠?chuàng)建了mysql雙寫(xiě)同步任務(wù),所以u(píng)serinfo的變更內(nèi)容,會(huì)被同步到test表。test表的username值變更為name111,并且timestamp為test表userid為1的行變更時(shí)的時(shí)間 2018-05-14 23:37:04。和userinfo的timestamp值2018-05-15 23:37:03相比,同步userinfo變更內(nèi)容到test表,只花費(fèi)了1秒。
2.4 對(duì)比
圖6⑴是使用本系統(tǒng)后的效果圖。highwater表示每個(gè)時(shí)間點(diǎn)mysql總共有多少變更數(shù)據(jù),offset表示當(dāng)前消費(fèi)的數(shù)據(jù)量。從中可以看到,2條線是重合的,也就是說(shuō)在每個(gè)時(shí)間點(diǎn)的mysql變更,本系統(tǒng)都能夠?qū)崟r(shí)的處理消費(fèi)。由于監(jiān)控的原因,offset可能會(huì)高于highwater,offset高于highwater表示SIS消費(fèi)是完全跟上了mysql變更。
圖6⑵是不使用本系統(tǒng)而是使用增量的方式處理消息的延時(shí)圖??梢钥吹较旅娴木€offset總是經(jīng)過(guò)一段時(shí)間后才上漲,而不能做到實(shí)時(shí)的和highwater保持一致。通過(guò)對(duì)比,使用了本系統(tǒng)后,能夠做到數(shù)據(jù)的實(shí)時(shí)處理。
3 總結(jié)
本文主要研究了一個(gè)通用的,能夠服務(wù)于不同系統(tǒng)的數(shù)據(jù)同步系統(tǒng)。通過(guò)本系統(tǒng),開(kāi)發(fā)人員只需要編寫(xiě)一份簡(jiǎn)單的描述文件,說(shuō)明要同步的數(shù)據(jù)從哪來(lái)、到哪去,比如指定需要同步的數(shù)據(jù)是哪個(gè)mysql的表,這些數(shù)據(jù)會(huì)被同步到solr還是mysql的另一個(gè)新表等。系統(tǒng)根據(jù)配置內(nèi)容就能自動(dòng)同步。有了此系統(tǒng)能夠讓開(kāi)發(fā)人員專(zhuān)注于業(yè)務(wù)開(kāi)發(fā)而不需要花費(fèi)大量的精力在業(yè)務(wù)之外的代碼編寫(xiě)上,提高了開(kāi)發(fā)效率。
參考文獻(xiàn)(References):
[1] 倪超.從Paxos到Zookeeper:分布式一致性原理與實(shí)踐[M].北京:機(jī)械工業(yè)出版社,2015.
[2] 牟大恩.Kafka入門(mén)與實(shí)踐[M].人民郵電出版社,2017:59-89
[3] Craig Walls.Spring實(shí)戰(zhàn)(第4版) [M].人民郵電出版社,2016:187-205
[4] 明日科技.Java Web從入門(mén)到精通 [M].清華大學(xué)出版社,2012:78-89
[5] 葛一鳴,郭超.實(shí)戰(zhàn)Java高并發(fā)程序設(shè)計(jì)[M].電子工業(yè)出版社,2015:100-110
[6] 鳥(niǎo)哥.鳥(niǎo)哥的Linux私房菜[M].人民郵電出版社,2010:120-150
[7] 克雷格·沃斯.Spring Boot實(shí)戰(zhàn)[M].人民郵電出版社,2016:93-134
[8] Bruce Eckel.Java編程思想(第4版)[M].機(jī)械工業(yè)出版社,2007:135-150
[9] 瘋狂軟件.Spring+MyBatis企業(yè)應(yīng)用實(shí)戰(zhàn)[M].電子工業(yè)出版社,2017:87-102
[10] Raoul-Gabriel Urma, Mario Fusco, Alan Mycroft.Java 8 in Action[M].USA:Manning,2014:153-160