亚洲免费av电影一区二区三区,日韩爱爱视频,51精品视频一区二区三区,91视频爱爱,日韩欧美在线播放视频,中文字幕少妇AV,亚洲电影中文字幕,久久久久亚洲av成人网址,久久综合视频网站,国产在线不卡免费播放

        ?

        基于Flink框架的TopN堆排序優(yōu)化算法

        2020-03-16 05:17:10魏碧晴
        關(guān)鍵詞:排序

        關(guān) 沫,魏碧晴

        (沈陽工業(yè)大學(xué) 信息科學(xué)與工程學(xué)院,遼寧 沈陽 110870)

        0 引言

        隨著計(jì)算機(jī)技術(shù)和信息科技的快速發(fā)展,全球的數(shù)據(jù)量急劇增長,2015年全球的數(shù)據(jù)總量達(dá)到8.61 ZB,預(yù)估2020年全球的數(shù)據(jù)總量會超過40 ZB。通過移動互聯(lián)網(wǎng)、社交媒體等服務(wù)模式,大數(shù)據(jù)產(chǎn)業(yè)已滲透到人們生活的各個(gè)方面,并且數(shù)據(jù)價(jià)值的時(shí)效性越來越重要,集群必須以毫秒級的延遲從大規(guī)模的數(shù)據(jù)中提煉有價(jià)值的信息[1]。

        TopN問題就是從許多的數(shù)值選出前N個(gè)最大或者最小的數(shù)值有序排好,最常見的應(yīng)用于微博熱搜榜、歌曲人氣榜、投票選舉等。由此可見利用大數(shù)據(jù)技術(shù)和計(jì)算機(jī)技術(shù)能輕松解決傳統(tǒng)排序問題。如微博熱搜榜,需要實(shí)時(shí)更新點(diǎn)擊量并按其從大到小的順序排列。而使用流計(jì)算框架Flink來解決TopN問題可以滿足其實(shí)時(shí)性和低延遲的要求。

        1 Flink框架

        Flink是一個(gè)開源的分布式流式處理框架,它針對數(shù)據(jù)流的分布式計(jì)算提供數(shù)據(jù)分布、數(shù)據(jù)通信和容錯(cuò)機(jī)制等功能[2]。

        1.1 Flink框架的優(yōu)點(diǎn)

        與Storm框架相比,F(xiàn)link能提供Exactly-Once精確性保證、吞吐量高、有背壓處理機(jī)制[3]。與Spark框架相比,F(xiàn)link的延遲更短,是毫秒級別的,在流處理方面,F(xiàn)link的性能比Spark更好。

        此外Flink框架使用Chandy-Lamport算法來做異步分布式快照,其本質(zhì)是checkpoint。checkpoint操作是異步的,不會打斷數(shù)據(jù)的處理,并且是非常輕量的,容錯(cuò)的代價(jià)相對很低。Flink可以做到每條數(shù)據(jù)同步更新watermark,watermark的計(jì)算實(shí)時(shí)性高,輸出延遲低[4]。

        1.2 Flink框架的缺點(diǎn)

        由于Flink是以固定的緩存塊為單位來進(jìn)行的數(shù)據(jù)傳輸,緩存塊的超時(shí)值可以人為規(guī)定,如果緩存塊的超時(shí)值為無限大,則Flink的數(shù)據(jù)傳輸方式就是批處理,此時(shí)系統(tǒng)可以獲得最大吞吐量[5]。如果緩存塊的超時(shí)值為0,F(xiàn)link的數(shù)據(jù)傳輸方式是流處理,此時(shí)雖然系統(tǒng)可以獲得最低的處理延遲,但是吞吐量也會降低[6]。

        本文就基于Flink框架的TopN排序問題提高其吞吐量提出解決方案。

        2 堆排序優(yōu)化算法

        2.1 傳統(tǒng)的堆排序過程

        先將待排序的數(shù)字構(gòu)建成一個(gè)堆(堆是一棵完全二叉樹),如果是要按從小到大的順序排列,就先將這個(gè)堆調(diào)整成大頂堆,堆頂?shù)脑鼐褪亲畲笾?,然后將堆頂元素與末尾的元素交換位置,最大值“沉”到數(shù)組的末端。再將剩余的元素重新調(diào)整成大頂堆,把堆頂元素與倒數(shù)第二個(gè)元素交換位置,重復(fù)以上操作,最后就能得到一個(gè)從小到大排序的數(shù)組了。

        堆排序的難點(diǎn)在于將無序堆調(diào)整成大頂堆,調(diào)整步驟如下:定義一個(gè)數(shù)組arr,設(shè)父節(jié)點(diǎn)是arr[i],那么這個(gè)父節(jié)點(diǎn)的左孩子節(jié)點(diǎn)就是arr[2i+1],它的右孩子結(jié)點(diǎn)就是arr[2i+2]。首先比較左孩子節(jié)點(diǎn)和右孩子節(jié)點(diǎn)的大小,然后將其中較大的孩子節(jié)點(diǎn)與父節(jié)點(diǎn)進(jìn)行比較,如果父節(jié)點(diǎn)小于孩子節(jié)點(diǎn),就需要交換它們的位置,否則就不用交換。從最后一個(gè)非葉子節(jié)點(diǎn)開始,從左至右,從下至上進(jìn)行調(diào)整。

        而TopN問題只需要選出前N個(gè)最大數(shù)即可,所以不需要把后面那些很小的數(shù)值也排出來。為了降低程序時(shí)間復(fù)雜度和空間復(fù)雜度,可以在原有的堆排序基礎(chǔ)上加以改進(jìn)。定義初始化一個(gè)長度為N,元素都為0的數(shù)組arr。然后用迭代器的it.hasNext()方法判斷是否有待排序的元素,如果有就用it.next()方法接收待排序的元素x。把接收的元素x與arr數(shù)組的第一個(gè)元素進(jìn)行比較,如果接收的元素x大于arr[0],就用x替換arr[0],然后把a(bǔ)rr數(shù)組里的數(shù)字用大頂堆按從小到大的順序排列。利用迭代器依次接收待排序的元素,如此循環(huán)往復(fù),最終就能選出最大的N個(gè)數(shù)值。

        2.2 HeapOptimize優(yōu)化算法

        上文所描述的TopN問題的解決方法只能一個(gè)一個(gè)地接收數(shù)據(jù),數(shù)據(jù)的吞吐量較小,不具備處理大數(shù)據(jù)的能力,所以還需要繼續(xù)改進(jìn)這個(gè)算法。把圖1中的TopN的解決步驟取名為Process,改進(jìn)后的算法取名為HeapOptimize,該算法的步驟如圖1所示。

        圖1 HeapOptimize結(jié)構(gòu)圖

        箭頭表示數(shù)據(jù)流Stream的走向,最開始的Process算子接收并處理數(shù)據(jù)流,每一個(gè)Process從接收到的數(shù)據(jù)挑選出前N個(gè)最大的數(shù),然后把這N個(gè)數(shù)傳給下一層的Process去處理。第二層的Process把第一層所有Process選出的較大值再進(jìn)行挑選排序,把最終的結(jié)果傳給Sink。圖中以兩層Process為例展示其算法過程,也可以擴(kuò)展為更多層的Process來處理數(shù)據(jù),增加數(shù)據(jù)的吞吐量。可以根據(jù)需求,選擇合適的Process層數(shù),快速處理大量的數(shù)據(jù)。

        本文目前實(shí)現(xiàn)兩層的Process解決TopN問題,TopN作業(yè)在Flink框架上運(yùn)行成功的算子拓?fù)鋱D如圖2所示。先將Source數(shù)據(jù)源里的數(shù)據(jù)通過FlatMap把String類型格式轉(zhuǎn)化成int類型,便于后面的計(jì)算。

        下一步要做窗口計(jì)算,在定義窗口之前,要指定流是否需要keyed,使用keyBy(“word”)將無界流分成邏輯的keyed stream。擁有keyed stream將允許窗口計(jì)算由多個(gè)任務(wù)并行執(zhí)行,因?yàn)槊總€(gè)邏輯key流都可以獨(dú)立于其余任務(wù)進(jìn)行處理,key相同的元素將被發(fā)送到同一個(gè)任務(wù)。這里采用的是滾動窗口,沒有時(shí)間重疊,窗口周期是1 s。滾動窗口分配器將每個(gè)元素分配給固定窗口大小的窗口。例如,如果指定大小為1 s的滾動窗口,則將執(zhí)行當(dāng)前窗口,并且每1 s鐘將啟動一個(gè)新窗口,如圖3所示。

        圖2 算子拓?fù)鋱D

        圖3 滾動窗口示意圖

        在這里可以設(shè)置Process的并行度(即一層Process的個(gè)數(shù))。如果需要多層Process計(jì)算,可以重復(fù)從keyBy分流到Process這些步驟。最后需要把數(shù)據(jù)都集中到一個(gè)Process算子作業(yè)來排序,再把計(jì)算結(jié)果輸出到Sink里打印最終結(jié)果。算子拓?fù)鋱D的第三個(gè)算子是窗口計(jì)算和第一次Process排序合在一起的。

        2.3 Process的層數(shù)

        假如一分鐘需要處理v條數(shù)據(jù),一個(gè)Process算子一分鐘最多能處理m條數(shù)據(jù),數(shù)據(jù)通過一個(gè)Process算子處理后得到N條數(shù)據(jù)傳輸?shù)较乱粚?,如果?nèi)存和CPU足夠大,設(shè)x層需要執(zhí)行Process算子任務(wù)的個(gè)數(shù)(Process的并行度)是px,最后一層的并行度一定是1(因?yàn)樽詈笮枰y(tǒng)計(jì)出N條最大的數(shù)據(jù)),前幾層的Process的并行度不固定,但有一個(gè)上限,假設(shè)前n-1層的Process的并行度取最大值,則可以得出層數(shù)與數(shù)據(jù)量的關(guān)系。

        如果只有一層Process,該P(yáng)rocess的并行度也為1,需要滿足的條件是v≤m;

        3 實(shí)驗(yàn)結(jié)果與分析

        3.1 實(shí)驗(yàn)環(huán)境

        實(shí)驗(yàn)搭建的集群是由兩臺PC組成,由分布式隊(duì)列Kafka作為數(shù)據(jù)源點(diǎn)生產(chǎn)數(shù)據(jù),F(xiàn)link提供FlinkKafkaConsumer接口從Kafka取數(shù)據(jù),交給后面的operator來處理。用TaskManager節(jié)點(diǎn)構(gòu)建整個(gè)算子拓?fù)?,將?jì)算結(jié)果保存在HDFS中,以Zookeeper作為集群的同步協(xié)調(diào)節(jié)點(diǎn)負(fù)責(zé)分布式節(jié)點(diǎn)間的信息同步。集群分布情況如表1所示。

        表1 集群節(jié)點(diǎn)分布信息

        由于實(shí)驗(yàn)條件有限,TaskManager節(jié)點(diǎn)只創(chuàng)建了一個(gè),當(dāng)然TaskManager節(jié)點(diǎn)數(shù)越多越好。如果條件允許也可再創(chuàng)建一個(gè)JobManager節(jié)點(diǎn)作備用。集群節(jié)點(diǎn)的配置參數(shù)如表2所示。

        表2 節(jié)點(diǎn)配置參數(shù)

        3.2 對比實(shí)驗(yàn)與分析

        在一開始執(zhí)行作業(yè)的時(shí)候,Kafka開始生產(chǎn)數(shù)據(jù),Process算子的吞吐量從0開始慢慢變大到2 000。當(dāng)Kafka生產(chǎn)的數(shù)據(jù)量特別大,一個(gè)Process算子接收數(shù)據(jù)的速率高于它在一個(gè)瞬時(shí)脈沖內(nèi)處理的數(shù)據(jù),F(xiàn)link系統(tǒng)就會出現(xiàn)背壓,把數(shù)據(jù)量壓到底層算子Source上,數(shù)據(jù)流降速,并行度為1的Process算子的吞吐量也會變小,由于數(shù)據(jù)源的多樣性和輸入速率的變化以及集群的不穩(wěn)定性,使得Process算子的吞吐量在某個(gè)范圍波動,如圖4所示,該算子的吞吐量大概穩(wěn)定在1 500左右,處理數(shù)據(jù)的速度較為緩慢。

        圖4 Process的吞吐量v0

        此時(shí)若把該算子的并行度改為3,讓三個(gè)Process算子同時(shí)處理從數(shù)據(jù)源傳來的數(shù)據(jù)流,然后把這三個(gè)算子處理后的數(shù)據(jù)合流到下一個(gè)Process集中處理,也可以完成TopN作業(yè)。這三個(gè)Process的吞吐量如圖5、圖6、圖7所示,一個(gè)算子的吞吐量能達(dá)到2 000,v1+v2+v3>v0,把Process算子的并行度提高到3之后,整個(gè)TopN作業(yè)的吞吐量提高了約7 000條,TopN作業(yè)的吞吐量提高了。由此可見,當(dāng)算子出現(xiàn)性能瓶頸時(shí),增加Process的并行度可以增加整個(gè)排序作業(yè)的數(shù)據(jù)處理速度,解決Flink的背壓問題,同時(shí)也增加了排序作業(yè)占用的內(nèi)存和CPU。

        圖5 Process的吞吐量v1

        圖6 Process的吞吐量v2

        圖7 Process的吞吐量v3

        4 結(jié)論

        本文是將傳統(tǒng)的堆排序進(jìn)行優(yōu)化,用以處理TopN問題,把HeapOptimize算法放在Flink框架去執(zhí)行,把待處理的海量數(shù)據(jù)分割成小塊的數(shù)據(jù)來處理,用化整為零的思想處理數(shù)據(jù),把第一層的Process算子的處理結(jié)果傳到下一層Process進(jìn)行處理,這種方法比傳統(tǒng)方法的計(jì)算速度快,吞吐量增大。Process的層數(shù)根據(jù)數(shù)據(jù)量來設(shè)定,由數(shù)據(jù)量大小計(jì)算出理想的Process層數(shù)。把Flink資源合理地分配利用,最大限度地提高Flink吞吐量。

        猜你喜歡
        排序
        排排序
        排序不等式
        作者簡介
        名家名作(2021年9期)2021-10-08 01:31:36
        作者簡介
        名家名作(2021年4期)2021-05-12 09:40:02
        恐怖排序
        律句填空排序題的備考策略
        節(jié)日排序
        刻舟求劍
        兒童繪本(2018年5期)2018-04-12 16:45:32
        作者簡介(按文章先后排序)
        名家名作(2017年2期)2017-08-30 01:34:24
        按特定規(guī)律排序
        兒童與健康(2012年1期)2012-04-12 00:00:00
        日本高清在线一区二区| 久久亚洲精品中文字幕| 45岁妇女草逼视频播放| 精品亚洲a∨无码一区二区三区| 色一情一乱一乱一区99av| 香蕉成人啪国产精品视频综合网 | 国产av国片精品jk制服| 中文字幕熟妇人妻在线视频| 中文无码成人免费视频在线观看| 黄色录像成人播放免费99网| 国产精品国产三级国产an不卡| 中文字幕av伊人av无码av| 一区二区传媒有限公司| 在线成人tv天堂中文字幕| 亚洲一区二区不卡日韩| 国产又大大紧一区二区三区| 亚洲日韩小电影在线观看| 亚洲国产欧美日韩欧美特级| 亚洲高潮喷水无码av电影| 亚洲深夜福利| 久久久国产视频久久久| 可免费观看的av毛片中日美韩| 国产亚洲真人做受在线观看| 久久亚洲中文字幕无码| 国产精品美女一级在线观看| 男男啪啪激烈高潮无遮挡网站网址| 性色欲情网站| 亚洲AV无码一区二区三区人| 国产一区二区三区影片| 国产精品国产三级第一集| 最新国产精品久久精品| 国产精品福利视频一区| 久久国产精品免费一区六九堂| 亚洲中文字幕第15页| 一本久久a久久精品vr综合| 久久狠狠第一麻豆婷婷天天| 国产美女冒白浆视频免费| 无码乱肉视频免费大全合集| 国产色诱视频在线观看| 青青草免费激情自拍视频| 国产精品午夜夜伦鲁鲁|