亚洲免费av电影一区二区三区,日韩爱爱视频,51精品视频一区二区三区,91视频爱爱,日韩欧美在线播放视频,中文字幕少妇AV,亚洲电影中文字幕,久久久久亚洲av成人网址,久久综合视频网站,国产在线不卡免费播放

        ?

        Spark平臺中Kafka偏移量的讀取管理與設計

        2019-10-08 08:34:58高宗寶劉麗美張家銘宋國興
        軟件 2019年7期
        關鍵詞:越界副本偏移量

        高宗寶 劉麗美 張家銘 宋國興

        摘? 要: 隨著移動互聯(lián)網(wǎng)技術的大規(guī)模發(fā)展,創(chuàng)新型互聯(lián)網(wǎng)公司和迭代型各行各業(yè)應用產(chǎn)品層出不窮,門戶訪問、好友互動等操作產(chǎn)生的大規(guī)模日志記錄,對大數(shù)據(jù)處理的實時性、準確性和高可用性發(fā)起了挑戰(zhàn)。Kafka是一種高吞吐量分布式發(fā)布訂閱消息系統(tǒng),其在高并發(fā)數(shù)據(jù)讀寫方面優(yōu)勢明顯,但其提供的數(shù)據(jù)消費方式存在數(shù)據(jù)丟失和重復的風險。本文首先介紹Kafka架構及其Offset管理,介紹了新型流式數(shù)據(jù)處理框架SparkStreaming與Kafka的結合,并說明了Kafka數(shù)據(jù)消費方面存在的缺陷,最后提出了一種基于SparkStreaming讀取Kafka的近似Exactly Once方案實現(xiàn)。通過搭建實驗環(huán)境進行對比測試,驗證了該設計可以在保證數(shù)據(jù)讀取效率的前提下確保數(shù)據(jù)的準確性。

        關鍵詞: Kafka;Offset;SparkStreaming;數(shù)據(jù)準確性

        中圖分類號: TP302? ? 文獻標識碼: A? ? DOI:10.3969/j.issn.1003-6970.2019.07.022

        【Abstract】: With the large-scale development of mobile Internet technology, the application products of various industries emerge in an endless stream. The large-scale log records generated by portal access, friend interaction and other operations challenge the real-time, accuracy and high availability of large data processing. Kafka is a high throughput distributed publish-subscribe messaging system, which has obvious advantages in high concurrent data reading and writing, but its data consumption mode has the risk of data loss and duplication. Firstly, this paper introduces Kafka architecture and its Offset management, introduces the combination of SparkStreaming and Kafka, a new streaming data processing framework, and illustrates the shortcomings of Kafka data consumption. Finally, an approximate Exactly One scheme based on SparkStreaming to read Kafka is proposed. By building an experimental environment for comparative testing, it is verified that the design can ensure the accuracy of data on the premise of ensuring the efficiency of data reading.

        【Key words】: Kafka; Offset; SparkStreaming; Data accuracy

        0? 引言

        隨著IT和移動互聯(lián)網(wǎng)技術的飛速發(fā)展,互聯(lián)網(wǎng)[1]軟件產(chǎn)品迭代開發(fā)、層出不窮,數(shù)據(jù)量激增,如何存儲和及時處理這些海量數(shù)據(jù),挖掘其中企業(yè)比較感興趣的價值信息(如用戶喜好等)進而提供更好的產(chǎn)品服務(如好友推薦、產(chǎn)品推廣等)是數(shù)據(jù)導向型公司迫切需要解決的問題。門戶網(wǎng)站訪問、好友聊天、支付交易記錄等用戶操作產(chǎn)生的大規(guī)模日志記錄,對大數(shù)據(jù)處理的實時性和高并發(fā)性發(fā)起了挑戰(zhàn)。傳統(tǒng)的數(shù)據(jù)存儲介質(zhì),如關系型數(shù)據(jù)庫、文件系統(tǒng)等無法滿足數(shù)據(jù)實時讀寫傳輸和流處理,Apache Kafka應運而生。Kafka是由Apache軟件基金會開發(fā)的一個開源流處理平臺[2],是主要用Scala編寫的一種高吞吐量分布式發(fā)布訂閱消息系統(tǒng),因其擴展性好、高吞吐量、快速持久化、高可用性等優(yōu)點被各大消息系統(tǒng)、日志分析平臺、流數(shù)據(jù)處理平臺、門戶網(wǎng)站等廣泛使用。

        1? Kafka簡介

        1.1? Kafka架構

        Kafka消息系統(tǒng)的基本架構如圖1所示。其架構主要包括以下幾個組件:

        (1)Message:消息,通信基本單位。

        (2)Broker:Kafka節(jié)點實例,對應為Kafka集群的一臺機器。

        (3)Topic:主題,表示Kafka數(shù)據(jù)處理的消息源,數(shù)據(jù)的讀寫都要指定主題。

        (4)Producer:數(shù)據(jù)生產(chǎn)者,向某個Topic發(fā)布消息的對象,即一種push操作,將消息推送給代理對象Broker進行存儲。

        (5)Consumer:數(shù)據(jù)消費者,訂閱某個Topic并處理消息的對象,即一種pull操作,主動拉去數(shù)據(jù),Consumer自己控制消息的讀取速度和數(shù)量,如果Topic中沒有數(shù)據(jù),那么會周期性的pull操作直到有數(shù)據(jù)產(chǎn)生。

        (6)Partition:分區(qū),一個Topic可以有多個partition,一個消息實際存儲在Topic的某個Partition中,每個Parition可以保證消息的有序性。

        (7)Replications:分區(qū)副本,每個分區(qū)都可以設定副本數(shù)目分布到不同的Broker中以便于容錯。

        (8)ConsumerGroup:消費者組,一組consumer的集合,group訂閱的某個topic下的每個分區(qū)只能被其中的一個consumer消費,不會出現(xiàn)一個分區(qū)的數(shù)據(jù)被同一個group下的多個consumer消費的情況,可以理解為ConsumerGroup是Kafka提供的可擴展且具有容錯性的消費者機制,在開發(fā)過程中使用group.id來標識。

        Kafka集群中的所有節(jié)點都是平等的,不采用Master-Slave結構,這樣就不會出現(xiàn)類似HDFS的單點故障問題。Kafka利用zookeeper來解決分布式一致性問題,將broker節(jié)點、topic元數(shù)據(jù)信息等全部存儲到zookeeper中。

        為了保證較高的讀寫效率,對于每個partition,消息讀寫都有一個固定的副本完成,即Leader節(jié)點,其他的副本是Follower節(jié)點。Follower節(jié)點會定期同步Leader節(jié)點的數(shù)據(jù)。

        當使用工具kafka-topics.sh創(chuàng)建topic后,kafka會根據(jù)選舉策略對每個partition都選出一個Leader節(jié)點和相應數(shù)量的Follower節(jié)點(通過參數(shù)replication- factor控制副本數(shù)量)。圖2描述的是創(chuàng)建主題t1,分區(qū)數(shù)量為4,副本數(shù)量為3的情況。

        以partition=1為例,其讀寫節(jié)點是437(broker.id),副本節(jié)點分別是437、441、436,副本同步隊列分別是436、440、441。ISR(in-sync replica,副本同步隊列)是由Leader維護的與主節(jié)點數(shù)據(jù)同步的一個節(jié)點集合,當producer發(fā)送消息到leader后,follower會同步消息,如果某個follower沒有同步

        leader的消息太多或者失效,那么leader會將其從ISR中剔除。

        當leader失效后,kafka會從ISR中的副本中選舉出新的leader以保證服務的可用。

        1.2? 讀寫offset管理

        Topic可以簡單理解為一個queue,消息的生產(chǎn)與消費都要聲明消息所在的queue。為了提高數(shù)據(jù)讀寫效率和數(shù)據(jù)吞吐量,在物理上Topic被分成了多個partition,每個partition表示一個文件夾,命名為“topic名-分區(qū)號”,每個文件夾中保存消息數(shù)據(jù)、消息索引等。

        任何發(fā)布到partition的消息都會被append到文件尾部,每條消息在文件中的存儲位置稱之為偏移量(offset,long型整數(shù)),通過partition+offset可以唯一標識一條消息。因為是追加操作,所以在partition中消息是有序寫入磁盤的,其寫入和索引讀取效率都很高。圖3表明了一個分區(qū)數(shù)量為3的topic消息寫入狀態(tài)。

        當消息寫入時,kafka會按照默認規(guī)則規(guī)定消息會被寫入到哪個partition中。如果自定義規(guī)則合理,那么可以保證消息被均勻地分布到broker中。

        可以看出,消息的消費,核心是對partition和offset的管理。Kafka由ConsumerGroup控制消息的消費和偏移量,而不是交給Broker去存儲,甚至可以加以控制回到一個之前的偏移位置再次消費消息。

        Kafka提供了自動和手動2種偏移量管理方式[4,5]。

        Kafka默認會定期自動提交偏移信息,即enable. auto.commit=true。在kafka0.10版本之前,offset信息提交到zk中保存,但由于zk不適合大批量數(shù)據(jù)的并行讀寫操作,自kafka0.10版本,offset信息自動提交到名為__consumer_offsets的topic存儲。該topic默認有50個分區(qū),保存了每個ConsumerGroup消費的Topic所有partition的offset信息,如圖4所示。

        當然也可以采用手動更新的方法提交offset。

        在消息消費過程中,Kafka提供了如下3種可能的傳輸保障(consumer delivery guarantee)。

        (1)At most once:這種模式下,消息可能會丟,但是絕對不會重復消費。如果consumer設定autocommit偏移量,consumer在讀取到數(shù)據(jù)后立即更新offset后未來得及處理消息(如consumer系統(tǒng)崩潰),下次重新工作時無法讀取之前未處理的消息,導致數(shù)據(jù)丟失。

        (2)At least once:這種模式下,數(shù)據(jù)不會丟失,但是可能會存在重復消費。consumer在讀取到數(shù)據(jù)后立即處理,處理完成后沒來得及提交偏移量。下次重新工作時還會重新讀取已處理但是沒有提交偏移量的數(shù)據(jù),導致數(shù)據(jù)重復。

        (3)Exactly once:這種模式下,數(shù)據(jù)既不會丟失也不會重復消費,需要協(xié)調(diào)消費數(shù)據(jù)和offset進行精確事務管理,如將數(shù)據(jù)和offset信息寫入到HDFS等外部介質(zhì)中,這種模式對處理效率有一定影響。

        2? SparkStreaming簡介

        SparkStreaming是基于spark的流式批處理引擎,可以實現(xiàn)高吞吐量的、具備容錯機制的實時流數(shù)據(jù)的處理,能夠與RDD算子、機器學習、SparkSQL以及圖形圖像處理框架無縫連接[3-6]。類似于Apache Storm,用于流式數(shù)據(jù)的處理。SparkStreaming支持多種數(shù)據(jù)源,如Flume、Kafka、HDFS、套接字等,經(jīng)過一系列RDD算子或windows等高級函數(shù)進? 行處理后,將結果寫入到文件系統(tǒng)、數(shù)據(jù)庫等輸出源中。

        SparkStreaming接收實時數(shù)據(jù)流,并以某一時間間隔(batchDuration)劃分為一個個數(shù)據(jù)批次(batch)交給Spark Engine處理。SparkStreaming的數(shù)據(jù)處理流程如圖5所示。

        Dstream是SparkStreaming中特有的數(shù)據(jù)類型,表示一系列連續(xù)的RDD集合,即數(shù)據(jù)批次集合,存儲方式是Map,對每個批次數(shù)據(jù)的處理實際上是RDD的操作,每個批次的處理邏輯是完全相同的。

        SparkStreaming+Kafka進行流數(shù)據(jù)處理被廣泛采用,本文后續(xù)討論基于spark2.3+kafka0.10展開。

        3? 一種可靠的Kafka消費方案

        3.1? 方案設計

        SparkStreaming通過KafkaUtils.createDirectStream創(chuàng)建數(shù)據(jù)流Dstream,默認情況下enable.auto.commit= true自動提交offset,即對應At most once模式。并且無論StreamingContext是否安全終止,都會出現(xiàn)在一段時間后已消費offset值等于最新offset值,盡管此時數(shù)據(jù)還遠沒有消費完數(shù)據(jù)。具體見方案測試。

        設置enable.auto.commit=false可以手動提交offset更新,如Spark中可通過stream.asInstance-Of[CanCommitOffsets].commitAsync (offsetRanges)來進行數(shù)據(jù)處理完后手動提交更新。需要注意的是,此方法將offsetRanges保存在一個隊列中,只有等consumer獲取下一批次數(shù)據(jù)后才提交offsetRanges。方案測試中通過5次實驗對比進行驗證。具體見方案測試。

        在很多設計方案中將offset更新到zk中存儲。然而zk并不適合大規(guī)模數(shù)據(jù)并發(fā)讀寫,尤其是寫效率不高。Kafka允許多個ConsumerGroup并行讀寫數(shù)據(jù),如果offset全部在zk中管理會影響zk性能,進而影響kafka的leader選舉、集群同步等功能。

        因此,綜合考慮kafka集群性能和數(shù)據(jù)讀寫效率,本文設計實現(xiàn)了一種At least Once方案SEO (Similar Exactly Once),每個ConsumerGroup在本地系統(tǒng)中維護offset信息,KafkaCluster提供維護信息,在不影響讀取效率的情況下趨向于Exactly Once保障。

        SEO方案實現(xiàn)的假設條件是zk不可靠或存在延遲,實現(xiàn)目的是數(shù)據(jù)不可丟失,極端情況下允許數(shù)據(jù)重復。方案的一些專有名詞包括:

        客戶端:運行SparkStreaming程序所在的機器;

        gtoffset文件:客戶端存儲的偏移量文件,文件存儲路徑類似于...groupid/topicname/gtoffset,文件包括groupid消費topicname所有分區(qū)的offset信息。

        偏移量越界:包括低越界、高越界。低越界指的是gtoffset記錄的偏移量信息小于Kafka目前可用的offset最小值,高越界指的是gtoffset記錄的偏移量信息超過Kafka目前最新的offset值。

        方案的實現(xiàn)思路如下。

        (1)在客戶端是否存在gtoffset文件,若不存在,說明groupid是第一次消費Topic,那么按照auto.offset.reset=earliest從當前可用的最小offset讀取數(shù)據(jù);如果存在,說明groupid已經(jīng)消費過Topic,讀取得到offset集合A。

        (2)使用spark-streaming-kafka-0-8中的Kafka?Cluster構建Kafka集群連接,進行偏移量越界判斷。使用getEarliestLeaderOffsets得到Topic的最小可用offset集合M,使用getLatestLeaderOffsets得到Topic的最大可用offset集合N。

        (3)如果A中所有分區(qū)的offset都滿足offset_ (M,par)≤offset_(A,par)≤offset_(N,par)那么說明A有效,A不需要更新;如果A中存在分區(qū)的offset滿足offset_(M,par)≥offset_(A,par),即A中有的分區(qū)offset比最小值都小,低越界,那么更新這些offset為M中對應分區(qū)的offset;同樣道理,如果A中存在分區(qū)的offset滿足offset_(A,par)≥offset_(N,par),即A中有的分區(qū)offset比最大值都大,高越界,那么更新這些offset為N重對應分區(qū)的offset。

        (4)解決偏移量越界后,使用更新后A集合拉取Kafka中的數(shù)據(jù)進行處理,處理成功后將最新offset信息寫入到gtoffset文件中。因為offset更新到本地文件,無需與zk、kafka等建立外部連接,可以保證更新效率,程序異常也可控制,所以該方案可以類似實現(xiàn)Exactly once傳輸保障。

        3.2? 方案測試

        本次測試共包括3次試驗,3次實驗環(huán)境完成相同,軟硬件環(huán)境如下。

        第一次實驗為enable.auto.commit=true,此時存在數(shù)據(jù)丟失情況,且出現(xiàn)offset更新為最大值的bug。實驗過程是:topic=test1共4個分區(qū),寫入100006條記錄,設定程序時間間隔為2 s,每秒每分區(qū)最大讀取50條記錄,過10 s時間后停止spark程序,此時數(shù)據(jù)沒有處理完,但是已消費offset(CURRENT-OFFSET)已達到最大值(LOG-END- OFFSET),具體結果見圖6。再次啟動程序后沒有數(shù)據(jù)可讀,數(shù)據(jù)丟失。經(jīng)過10次修改時間間隔和處理數(shù)據(jù)條數(shù),都復現(xiàn)同樣的問題。

        第二次實驗為通過CanCommitOffsets手動提交偏移量,共包括5次驗證。實驗過程是:topic=test2共4個分區(qū),寫入100000條記錄,測試5次,每次修改批次間隔和每分區(qū)每秒最大讀取消息數(shù),每次在消費過程中終止spark程序一次,然后重啟程序直到消費完數(shù)據(jù)。得到的實驗結果如表1。

        通過實驗結果發(fā)現(xiàn)每次均存在重復消費,重復消費的數(shù)量等于分區(qū)數(shù)、間隔時間、每分區(qū)每秒最大消息數(shù)三者的乘積(假設在消費過程中只有一次終止)。

        第三次實驗為通過SEO方案手動提交偏移量,共包括5次驗證。實驗過程是:topic=test3共4個分區(qū),寫入100000條記錄,測試5次,每次修改批次間隔和每分區(qū)每秒最大讀取消息數(shù),每次在消費過程中通過ssc.stop(true, true)安全終止spark流程序一次,然后重啟程序直到消費完數(shù)據(jù)。得到的實驗結果如表2。

        通過實驗結果發(fā)現(xiàn)每次均不存在重復消費也不存在數(shù)據(jù)丟失,整個實現(xiàn)過程中沒有頻繁與第三方數(shù)據(jù)源進行交互,達到了數(shù)據(jù)不丟失的目的,近似實現(xiàn)了Exactly Once模式。當然,在極端情況下,如果某個批次數(shù)據(jù)已經(jīng)處理結束(如導入到數(shù)據(jù)庫中)后,即使安全終止spark任務也未能更新本地gtoffset文件,此時重啟spark任務會出現(xiàn)數(shù)據(jù)重復消費的問題。

        4? 結束語

        互聯(lián)網(wǎng)飛速發(fā)展,數(shù)據(jù)質(zhì)量和數(shù)據(jù)價值最大化是每個互聯(lián)網(wǎng)企業(yè)和傳統(tǒng)企業(yè)都需要考慮的問題,數(shù)據(jù)存儲與計算的并發(fā)性、實時性導致的產(chǎn)品性能優(yōu)劣直接影響了用戶的體驗。本文首先介紹新型流式數(shù)據(jù)處理框架SparkStreaming與Kafka的數(shù)據(jù)消費結合,提出了一種基于SparkStreaming讀取Kafka的近似Exactly Once方案實現(xiàn)并搭建集群環(huán)境繼續(xù)數(shù)據(jù)準確性驗證。

        參考文獻

        [1] 趙旭劍, 鄧思遠, 李波, 等. 互聯(lián)網(wǎng)新聞話題特征選擇與構建[J]. 軟件, 2015, 36(7): 17-20.

        [2] Wang J, Wang W, Chen R. Distributed Data Streams Processing Based on Flume/Kafka/Spark[C]//International Conference on Mechatronics and Industrial Informatics. 2015.

        [3] Ichinose A, Takefusa A, Nakada H, et al. A study of a video analysis framework using Kafka and spark streaming[C]// IEEE International Conference on Big Data. IEEE, 2017: 2396-2401.

        [4] 王巖, 王純. 一種基于Kafka的可靠的Consumer的設計方案[J]. 軟件, 2016, 37(1): 61-66.

        [5] 王鄭合, 王鋒, 鄧輝, 等. 一種優(yōu)化的Kafka消費者/客戶端負載均衡算法[J]. 計算機應用研究, 2017, 34(8): 2306-2309.

        [6] 鄭健, 馮瑞. 基于Spark的實時視頻分析系統(tǒng)[J]. 計算機系統(tǒng)應用, 2017, (12). doi:10.15888/j.cnki.csa.006112.

        猜你喜歡
        越界副本偏移量
        越界·互換·融合——中國化爵士樂的生成路線與認同政治
        文學與文化(2022年4期)2022-03-23 06:20:04
        車門玻璃Y向偏移量對升降系統(tǒng)異響問題的影響
        北京汽車(2022年1期)2022-03-02 06:25:18
        “越界”的第一書記——寶雞市陳倉區(qū)“第一書記聯(lián)盟”成立背景
        當代陜西(2021年2期)2021-03-29 07:40:54
        面向流媒體基于蟻群的副本選擇算法①
        攪拌針不同偏移量對6082-T6鋁合金接頭勞性能的影響
        基于最小二乘平差的全極化SAR配準偏移量估計方法
        測繪工程(2017年3期)2017-12-22 03:24:50
        副本放置中的更新策略及算法*
        陣列方向圖綜合中PSO算法粒子越界處理研究
        樹形網(wǎng)絡中的副本更新策略及算法*
        越界婚姻的倫理窘境:評史密斯《南街》
        精品亚洲一区二区三区四区五区| 亚洲精品成人专区在线观看| 天天天综合网| 无遮挡很爽视频在线观看| 亚洲中文字幕国产剧情| 国语对白在线观看免费| 国产激情久久久久影院小草| 少妇扒开毛茸茸的b自慰| 久久久午夜精品福利内容| 国产欧美日韩视频一区二区三区| 国产在线观看黄| 亚洲视频观看一区二区| 风韵犹存丰满熟妇大屁股啪啪| 欧美肥婆性猛交xxxx| 四虎国产精品免费久久| 日韩欧美亚洲综合久久影院d3| 丝袜美腿网站一区二区| 中文字幕亚洲乱码熟女在线| 中国亚洲av第一精品| 人妖av手机在线观看| 亚洲一区二区三区尿失禁| 国内露脸少妇精品视频| 亚洲欧美日韩一区在线观看| 91久久福利国产成人精品| 偷拍激情视频一区二区| 一区二区三区夜夜久久| 神马影院日本一区二区| 国产精品无码久久综合网| 黄色视频免费在线观看| av超碰在线免费观看| 久久精品国产亚洲av热九九热| 国产精品女同av在线观看| 女人被狂躁的高潮免费视频| 爽爽精品dvd蜜桃成熟时电影院| 妺妺窝人体色www在线图片| 亚洲欧洲国无码| 国产免费人成视频在线| 久久久亚洲欧洲日产国码aⅴ| 人妻丰满熟妇av无码片| 免费毛片在线视频| 少妇人妻中文字幕在线|