林德煜
(中移互聯(lián)網(wǎng)有限公司,廣東 廣州 510653)
消息隊列中間件是一種為不同系統(tǒng)或同一系統(tǒng)內(nèi)不同模塊提供可靠異步網(wǎng)絡(luò)通信的分布式框架,接收來自上游服務(wù)的消息,存儲后轉(zhuǎn)發(fā)至下游服務(wù),在系統(tǒng)架構(gòu)中起著承上啟下的作用[1]。Kafka 是一個處理海量數(shù)據(jù)的分布式消息系統(tǒng),具有高效的數(shù)據(jù)傳輸速率,相對于其他的消息隊列系統(tǒng)具有較高的性能,采用發(fā)布/訂閱模式,具有較強的可靠性、海量數(shù)據(jù)處理能力以及可拓展性,是不少業(yè)務(wù)平臺選型隊列和削峰填谷功能的很好選擇[2]。
通常為了避免單點問題,高并發(fā)業(yè)務(wù)平臺通常需要滿足多節(jié)點部署。例如,業(yè)務(wù)平臺有2 個節(jié)點A和節(jié)點B 同時對外提供服務(wù),節(jié)點A 出現(xiàn)故障需要容災(zāi)切換時,通常會將網(wǎng)關(guān)入口全部切換到B 節(jié)點,如果A 節(jié)點中Kafka 隊列存在未消費消息時,為了不影響業(yè)務(wù),需要將A 節(jié)點的未消費數(shù)據(jù)在用戶無感知的情況下同步到B 節(jié)點,并在B 節(jié)點繼續(xù)消費。
Kafka 支持集群部署,它的數(shù)據(jù)由broker 負(fù)責(zé)存儲和同步。Kafka broker 可以對隊列(Topic)進(jìn)一步分片,Producer 負(fù)責(zé)向broker 推送數(shù)據(jù),Consumer 負(fù)責(zé)從broker 消費數(shù)據(jù)。
Kafka 支持消費者以不同的消費組消費相同的Topic,如圖1 所示。圖1 中:每個Topic 可以分為多個分區(qū),如P0、P1、P2、P3;每個服務(wù)器負(fù)責(zé)部分分區(qū),一個消費組中只有一個消費者能消費到特定分區(qū),不同的消費組內(nèi)消費者可以重復(fù)消費同一個特定分區(qū),消費者可以同時消費多個分區(qū)。
圖1 Kafka 多消費組消費
在實現(xiàn)數(shù)據(jù)同步的過程中,按照數(shù)據(jù)源節(jié)點的流量主要分為單向同步和雙向同步[3]。對于要求實現(xiàn)業(yè)務(wù)雙活節(jié)點的平臺,一般需要實現(xiàn)雙向同步。
Kafka 數(shù)據(jù)包含隊列數(shù)據(jù)(包括隊列自身的Offset)和消費組Offset 數(shù)據(jù)。這2 部分?jǐn)?shù)據(jù)的同步對一個高可用系統(tǒng)來說至關(guān)重要。數(shù)據(jù)同步指當(dāng)某一節(jié)點服務(wù)器產(chǎn)生一條數(shù)據(jù)時,需要把該數(shù)據(jù)實時同步到其他的節(jié)點中,以便其他節(jié)點完成必要的工作或提供相關(guān)服務(wù)[4]。基于性能考慮,通常采用定期同步的方式,將隊列數(shù)據(jù)和Offset 數(shù)據(jù)從A 節(jié)點同步到備用B 節(jié)點。
為實現(xiàn)Kafka 不同節(jié)點的數(shù)據(jù)同步,可以在2 個節(jié)點之間引入中間件,模擬Kafka A 節(jié)點的消費者和B 節(jié)點的生產(chǎn)者,定期將Kafka 對應(yīng)的消息數(shù)據(jù)、Topic 最新Offset 和消費組消費Offset 同步到B 節(jié)點。
了解了Kafka 的消費原理之后,可以選擇官方/開源的第三方框架實現(xiàn)異地節(jié)點,也可以自己開發(fā)一套中間件實現(xiàn)類似功能,二者的優(yōu)缺點都很明顯。
(1)選擇官方或開源的第三方工具。其優(yōu)點是可靠性相對較高,具有一定的生態(tài)成熟度,資料文檔相對完善,接入業(yè)務(wù)的時間相對較短。其缺點是相比自研工具,工具版本的更新相對不可控。
(2)選擇自研工具。其優(yōu)點是版本更新迭代和代碼完全自主可控,缺點是開發(fā)周期時間長,成熟度需要時間。
綜合時間、成熟度、業(yè)務(wù)需求考慮,本次研究采用了基于官方方案做優(yōu)化改進(jìn)的策略。
目前,最常用Kafka 跨節(jié)點同步工具是Kafka官方自帶的Mirror Maker。Mirror Maker 在異地數(shù)據(jù)同步中廣泛使用,可靠性和成熟度較高。目前,Mirror Maker 最新的版本為Mirror Maker2(以下簡稱MM2)。MM2 基于Kafka Connect 實現(xiàn),支持跨節(jié)點復(fù)制Topics 數(shù)據(jù)以及配置信息,也支持復(fù)制消費組及其消費Topic 的Offset 信息;MM2 相比Mirror Maker有較大的優(yōu)化和改善,對于同一個Topic 在不同節(jié)點中配置不同的前綴,同步時識別消息歸屬,從而解決回環(huán)問題。
通常Kafka 同步組件自身并不具備良好的進(jìn)度檢測,僅監(jiān)控組件自身進(jìn)程無法確定Kafka 是否已完成數(shù)據(jù)的同步。在實際應(yīng)用中,引入一種基于滑動時間窗口的同步延遲檢測算法,基于該算法開發(fā)腳本工具M(jìn)Q_Sync_Monitor,只需要在源節(jié)點部署一套,負(fù)責(zé)從源節(jié)點A 到目標(biāo)節(jié)點B 的同步延遲檢測。
滑動窗口指以固定窗口為單位不斷進(jìn)行更新,如果滑動窗口已滿,那么最先進(jìn)入滑動窗口的一個固定窗口被刪除,滑動窗口隨之更新一次[5]。
MM2 消費組Offset 同步時間配置字段為sync.group.offsets.interval.seconds,定義定期同步的時間為mq_sync_interval_seconds(以下簡稱MQ 同步時間MST)。該參數(shù)通常配置等于sync.group.offsets.interval.seconds,MQ 同步工具負(fù)責(zé)啟動MQ 數(shù)據(jù)的同步,因為涉及A 節(jié)點和B 節(jié)點兩邊跨節(jié)點的輸入輸出(Input/Output,I/O)操作,該操作通常需要超過1 s 才能完成。
本算法處理周期保持跟MST 一致,稱其為算法處理時間PT。
MQ 同步的Offset 數(shù)據(jù)不一致:MQ 對應(yīng)的數(shù)據(jù)從A 節(jié)點同步到B 節(jié)點時,假設(shè)A 的某個Topic 最大Offset 為A_maxOffsetLast=10 000,消費者Z 對應(yīng)的消費組Offset 為A_consumeOffset=8 000。
完全同步到B 節(jié)點之后,B 節(jié)點查詢得到的該Topic 最大Offset 和消費者Z 對應(yīng)的消費組Offset 可能為B_maxOffsetLast=5 000、B_consumeOffset=3 000。因此,A 和B 對應(yīng)的Offset 數(shù)據(jù)通常不對等。
算法原理:MQ 同步非實時,所設(shè)計B 的跨度時間要包含A 的跨度時間,假設(shè)A 的Offset 增加值為A_Sub,如果B 在跨度時間內(nèi)的Offset 增加值小于A_Sub,則說明存在同步延遲問題?;瑒訒r間活動窗口如圖2 所示。考慮間隔MPT 的同步操作可能剛好在算法處理時間PT 之前1 s 內(nèi)執(zhí)行,而通常同步操作可能需要超過1 s 才完成,所以即使是B 的2 次算法處理時間PT,Offset 同步上限仍然沒辦法確保包含A的1 次算法處理時間PT,因此需要計算B 的3 次算法處理時間PT。
圖2 檢測滑動窗口
實現(xiàn)中以4 個MQ 同步時間MST 為滑動時間窗口,假如MQ 的A 間隔一次算法處理時間PT 對應(yīng)的Offset 差有變動(假設(shè)差值為A_Sub),而B 對應(yīng)的差值小于A_Sub,即B[times]-B[times-3]<A_SUB,則產(chǎn)生警告。
算法邏輯如圖3 所示。
圖3 MQ 同步延遲檢測
首先,進(jìn)行初始化。其代碼為
針對每個topic,包含4 個long 類型數(shù)組和一個long 類型參數(shù)
A_maxOffset//代表A 節(jié)點某topic 對應(yīng)的Offset
A_consumeOffset//代表A 消費Offset
B_maxOffset//代表B 某個topic 對應(yīng)的Offset
B_consumeOffset//代表B 消費Offset
times=0 算法的實現(xiàn)需考慮times 超過最大值的處理。同時,建議數(shù)組采用循環(huán)數(shù)組方式,只保留最新4 個元素。
其次,定時執(zhí)行進(jìn)度監(jiān)控,間隔時間為PT,該值和MPT 時間保持一致。
通過調(diào)用MQ 提供的腳本查詢A 和B 節(jié)點MQ對應(yīng)Topic 的Offset 和對應(yīng)客戶端消費組Offset。
對于單位時間內(nèi)A 節(jié)點自身的Offset 變化較小(如變化為0),為節(jié)省計算資源,可以選擇不做判斷處理,分別引入Min_Delay_Latest_Offset 和Min_Delay_Consume_Offset 作為topic 同一個分片最新Offset 和消費組消費Offset 同步延遲檢測判斷閾值。
如果times ≥3(B 的PT 最少要從3 開始計算),則啟動判斷:判斷A 節(jié)點的A_maxOffset[times-2]和A_maxOffset[times-3]的差值A(chǔ)_maxOffsetSub 是否大于Max_Delay_Max_Offset,如是,則判斷B_maxOffset[times]和B_maxOffset[times-3]的差值是否小于A_maxOffsetSub,如是,提示當(dāng)前topic 最大Offset 同步延遲告警;判斷A 節(jié)點的A_consumeOffset[times-2]和A_consumeOffset[times-3]的差值A(chǔ)_consumeOffsetSub是否大于Max_Delay_Max_Offset,如是,則判斷B_consumeOffset[times] 和B_consumeOffset[times-3] 的差值是否小于A_consumeOffsetSub,如是,提示當(dāng)前topic 消費組Offset 同步延遲告警。無論如何,執(zhí)行times++。
在實際生產(chǎn)中,為了實現(xiàn)消息隊列跨節(jié)點容災(zāi),采用Kafka MM2 實現(xiàn)消息數(shù)據(jù)從源節(jié)點到目標(biāo)節(jié)點的同步。同時,采用腳本語言,基于上述MQ 同步延遲檢測算法實現(xiàn)了MQ 同步延遲檢測工具M(jìn)Q _Sync_Monitor。
在每個源節(jié)點部署一套MQ _Sync_Monitor 腳本工具,負(fù)責(zé)同步延遲的檢測。需要注意的是,要實時監(jiān)控MQ 同步工具和MQ _Sync_Monitor 腳本工具自身進(jìn)程。生產(chǎn)中,可基于配置zabbix 或Prometheus 等工具實現(xiàn)進(jìn)程的監(jiān)控。通過實現(xiàn)該算法,當(dāng)MQ 出現(xiàn)容災(zāi)異地切換時,可以較好地保障消息數(shù)據(jù)的一致性,從而保障業(yè)務(wù)的高可用性。
為了實現(xiàn)中移互聯(lián)網(wǎng)有限公司業(yè)務(wù)消息隊列異地容災(zāi),采用Kafka 官方MM2 同步工具實現(xiàn)源節(jié)點到目標(biāo)節(jié)點的數(shù)據(jù)同步,并引入了一種基于滑動時間窗口的同步延遲檢測算法,基于該算法實現(xiàn)Kafka 跨節(jié)點同步檢測工具。該工具在生產(chǎn)實踐中很好地解決了高可用分布式系統(tǒng)中Kafka 集群跨節(jié)點同步延遲檢測的盲區(qū),并為內(nèi)部其他項目的容災(zāi)提供了新的思路和借鑒。