亚洲免费av电影一区二区三区,日韩爱爱视频,51精品视频一区二区三区,91视频爱爱,日韩欧美在线播放视频,中文字幕少妇AV,亚洲电影中文字幕,久久久久亚洲av成人网址,久久综合视频网站,国产在线不卡免费播放

        ?

        采用ScheduledThreadPoolExecutor執(zhí)行定時重試任務時內(nèi)存溢出的分析及解決

        2016-05-14 11:20:05余志堅姜春志
        科技資訊 2016年7期
        關鍵詞:入隊線程調(diào)用

        余志堅 姜春志

        摘 要:開發(fā)JavaWeb項目中發(fā)現(xiàn)服務之間的調(diào)用存在超時情況,由于涉及的處理邏輯全部是異步,引入定時重試的機制,重試工具選擇了JDK自帶的ScheduledThreadPoolExecutor。當A服務依賴B服務,B服務由于在業(yè)務高峰期處理能力降低,導致大量A服務過來的請求超時,A加入了超時重試機制,間隔時間根據(jù)重試次數(shù)的多少來決定,次數(shù)越多,兩次重試之間間隔的時間越多,此時的業(yè)務高峰也會給A帶來大量請求,大量的超時會導致重試隊列迅速堆積,直到內(nèi)存溢出。該文從線程池工作機制、ScheduledThreadPoolExecutor實例的創(chuàng)建,獲取重試任務的過程以及提交任務的過程角度分析,并通過源代碼的剖析和測試工具MyEclipse進行演示測試內(nèi)存泄露的情況,得出避免內(nèi)存泄露的解決方案。

        關鍵詞:ScheduledThreadPoolExecutor 線程池 內(nèi)存溢出

        中圖分類號:TP3 文獻標識碼:A 文章編號:1672-3791(2016)03(a)-0015-03

        1 ScheduledThreadPoolExecutor實例的創(chuàng)建過程及線程池工作機制

        1.1 ScheduledThreadPoolExecutor實例的創(chuàng)建過程

        重試工具選擇了JDK自帶的ScheduledThreadPoolExecutor。ScheduledThreadPoolExecutor實例的創(chuàng)建過程如下:ScheduledThreadPoolExecutor實例的創(chuàng)建過程如下:(1)獲取當前機器上處理器的數(shù)量;(2)使用Google的ThreadFactoryBuiler創(chuàng)建指定格式名稱的線程,以方便查看問題;(3)有需要被拒絕的任務時,拋出異常;(4)創(chuàng)建定時任務池;打開MyEclipse工具顯示相對的代碼:int corePoolSize=Runtime.getRuntime().availableProcessors();

        ThreadFactory tf=new ThreadFactoryBuilder().setNameFormat("FailureRetryTask-pool-%d").build();

        RejectedExecutionHandler handler=new ThreadPoolExecutor.AbortPolicy();

        ScheduledThreadPoolExecutor taskService=new ScheduletThreadPooExecutor(corePoolSize,tf,handler);

        線程池就是多個線程在一個隊列中取任務執(zhí)行,提交的任務會被放入隊列中等待線程執(zhí)行,故隊列要設置一個大小。線程池同樣會根據(jù)任務繁忙程度來動態(tài)調(diào)整連接數(shù),空閑時保持最小連接數(shù),繁忙時增加連接,但不會超過上限,具有伸縮性,線程的創(chuàng)建和銷毀也需要消耗系統(tǒng)資源,線程的連接重用就可以避免這部分損失,具有重用性。

        1.2 線程池工作機制

        線程獲取任務的策略就是如果當前線程池運行狀態(tài)正常,則阻塞等待任務,否則直接返回或等待有限時間后返回。線程池中線程的主要任務就是獲取任務,然后執(zhí)行,然后再去獲取任務,如此循環(huán),這就實現(xiàn)了線程池中線程的可重用。

        Worker封裝了任務,同時創(chuàng)建了新的線程,并被添加到集合workers中,這個workers其實就是最核心的線程池。通過run方法實現(xiàn)重用。private final HashSet workers=new HashSet();

        public void run(){

        try{

        Runnable task=firstTask;

        firstTask=null;

        while(task!=null||(task=getTask())!=null){

        runTask(task);

        task=null;}}

        finally{

        workerDone(this);

        }

        }

        Runnable getTask(){

        for(;;){

        try{

        int state=runState;

        if(state>SHUTDOWN){return null;}

        Runnable r;

        if(state==SHUTDOWN){r=workQueue.poll();}

        else if(poolSize>corePoolSize||allowCoreThreadTimeOut){

        r=workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS);

        }else{r=workQueue.take();}

        if(r!=null){return r;}

        if(workerCanExit()){

        if(runState>=SHUTDOWN){interruptIdleWorkers();}

        return null;

        }

        }

        catch(InterruptedException ie){}

        }

        }

        private boolean workerCanExit(){

        final RenntrantLock mainLock=this.mainLock;

        mainLock.lock();

        boolean canExit; try{canExit=runState>=STOP||workQueue.isEmpty()||(allowCoreThreadTimeOut&&poolSize>Math.max(1,corePoolSize));

        }finally{mainLock.unLock();}

        return canExit;

        }

        如果此時線程池運行狀態(tài)是終止(runState >= STOP),或者隊列為空,或者允許核心線程超時并且線程池中線程數(shù)量大于最小線程數(shù)量,那么方法將返回true。再回到getTask方法,調(diào)用workerCanExit方法的前提是沒有獲取到任務,根據(jù)上邊獲取任務的過程,這幾個條件都有可能成立,所以此時getTask方法可以返回null,上層Worker的run方法從while循環(huán)重返回,整個線程結束,這就實現(xiàn)了線程池的可伸縮。

        2 ScheduledThreadPoolExecutor獲取任務的過程

        在getTask()中,描述了整個獲取任務的過程,如果線程池運行狀態(tài)已經(jīng)是SHUTDOWN了,調(diào)用非阻塞方法poll,因為如果當前有任務,那么可以獲取到任務并返回,如果沒有任務,也沒有必要阻塞在隊列上等待任務,因為已經(jīng)SHUTDOWN,后續(xù)不會再有任務進入。

        如果當前線程數(shù)大于最小線程數(shù),或者核心線程也可以做超時處理,意味著如果獲取不到任務就可以銷毀一部分線程了,所以poll方法設置了等待時間,超時后立即返回。

        另一種情況是線程池還在運行狀態(tài),并且當前線程數(shù)不大于最小線程數(shù),同時也不允許最小線程數(shù)以內(nèi)的線程超時,這個時候線程就要調(diào)用阻塞方法take,等待任務進入隊列以后才返回。

        3 ScheduledThreadPoolExecutor提交任務的執(zhí)行過程

        ScheduledThreadPoolExecutor提交任務的執(zhí)行過程,首先提交任務:taskService .schedule(new Runnable(){public void run(){}},1,TimeUnit,DAYS);

        ScheduledThreadPoolExecutor通過schedule方法提交定時任務,schedule方法源碼如下:

        public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit){

        if(command==null||unit==null){throw new NullPointerException();}

        if(delay<0){delay=0;}

        RunnableScheduledFuture<?> t=decorateTask(command,new ScheduledFutureTask(command,null,triggerTime));

        delayedExecute(t);

        return t;

        }

        提交的任務會被封裝成ScheduledFutureTask類型對象。

        分析delayedExecute方法:private void delayedExecute(Runnable command){

        if(isShutDown()){reject(command);return;}

        if(getPoolSize()

        super.getQueue().add(command);

        }

        如果線程的運行狀態(tài)不是RUNNNING或者入隊列沒有成功,則采用線程池的構造方法中設置的拒絕策略來處理任務。

        如果當前線程池中的線程數(shù)量poolSize小于線程池核心線程的數(shù)量corePoolSize,執(zhí)行prestartCoreThread(),該方法會創(chuàng)建一個新的線程來執(zhí)行任務,如果prestartCoreThread()創(chuàng)建的新線程執(zhí)行任務失敗或者當前線程池中的線程數(shù)量poolSize大于等于線程池核心線程數(shù)量corePoolSize,當若線程池的運行狀態(tài)是RUNNING并且入隊成功,由于在多線程環(huán)境下,狀態(tài)隨時可能會改變,此時線程池的運行狀態(tài)runState不是RUNNING或者線程池中沒有可用的線程(poolSize==0),要確保進入的任務被執(zhí)行處理,線程池在初始化完成以后是空的,并沒有線程,如果在服務器中使用線程池,服務重啟后有大量請求進入,則要同時創(chuàng)建多個線程,而且創(chuàng)建過程是加鎖同步的,會導致一定的競爭,解決辦法就是線程池初始化時調(diào)用prestartAllCoreThreads方法啟動核心線程數(shù)量的線程,這樣就能在線程池中的線程就緒以后才開始接收請求。

        通過getQueue方法獲取任務隊列,并且調(diào)用add方法向隊列中添加任務,dq的定義:

        private final DelayQueue dq=new DelayQueue();

        public boolean add(Runnable x){return dq.add((RunnableScheduleFuture)x);}

        可以看出dq是阻塞隊列,線程池中的線程都是在隊列中取數(shù)據(jù),ScheduledThreadPoolExecutor中的構造方法里的隊列的實現(xiàn)使用鏈表結構的阻塞隊列,add方法內(nèi)部調(diào)用offer方法,offer源碼如下:public boolean offer(E e){

        final ReentrantLock lock=this.lock();

        lock.lock();

        try{

        E first=q.peek();

        q.offer(e);

        if(first==null||e.compareTo(first)<0){

        available.singalAll();

        return true; }

        }finally{lock.unlock();}}

        這方法需要在多線程環(huán)境下同步執(zhí)行,會用到鎖Lock。鎖實現(xiàn)的大概原理如下。

        Lock實現(xiàn)鎖的方式是通過排他性更新共享變量,更新成功的線程繼續(xù)執(zhí)行,沒有更新成功的線程將會被阻塞。Lock的共享變量state在可重入鎖中可以用來表示一個線程調(diào)用了幾次lock方法,也就是有幾次獲取鎖的行為。Lock的功能實現(xiàn)是通過內(nèi)部聚合了抽象隊列同步器(AQS),同步器有公平和非公平之分。非公平同步器對于新來的線程會嘗試獲取,不成功以后才會進入等待隊列,而公平同步器則會首先判斷是否排隊。AQS中會保存獲取鎖的當先線程的引用。如果一次性嘗試獲取鎖不成功,則線程會進入隊列,循環(huán)嘗試獲取鎖。

        peek方法會獲取隊列的第一個元素,只是獲取,并沒有出隊列。接著調(diào)用優(yōu)先級隊列PriorityQueue類型變量q的offer方法將隊列入隊,優(yōu)先級隊列會對任務進行排序,距離執(zhí)行時間越近,位置越靠前。下邊的if判斷可以這樣理解,first是在當前任務入隊之前獲取的,也就是隊列中原有的第一個任務,compareTo的這段比較是說當前任務的執(zhí)行時間比隊列中第一個任務執(zhí)行時間還要早,如果first是null,那么當前任務入隊后將是第一個元素,如果當前任務的執(zhí)行時間比隊列中第一個任務的執(zhí)行時間早,那么當前入隊后也將是第一個元素,只要這兩個條件有一個成立了,這個if的判斷條件就為true,就要執(zhí)行Condition類型的available變量的signalAll方法,喚醒等待的線程工作。

        4 隊列的大小判斷

        隊列的大小是決定內(nèi)存溢出最直觀的因素,首先來看看優(yōu)先級隊列PriorityQueue的offer方法:public boolean offer(E e){

        if(e==null){throw new NullPointerException();}

        modCount++;

        int i=size;

        if(i>=queue.length){grow(i+1);}

        size=i+1;

        if(i==0){queue[0]=e;}

        else{siftUp(i,e);}

        return true;

        上述代碼表示如果隊列中元素的個數(shù)(size)大于等于隊列的長度,將要通過grow方法擴容,如下:private void grow(int minCapacity){

        if(minCapacity<0){throw new OutOfMemoryError();}

        int oldCapacity=queue.length;

        int newCapacity=((oldCapacity<64)?((oldCapacity+1)*2):((oldCapacity/2)*3));

        if(newCapacity<0){newCapacity=Integer.MAX_VALUE;}

        if(newCapacity

        queue=Arrays.copyof(queue,newCapacity);

        }

        若隊列容量小于64,那就在原有基礎上加1然后擴大2倍,這種情況絕對不會造成內(nèi)存的溢出問題。如果大于等于64呢?直接擴容一半,然后將值賦給一個int型變量,當某種情況如果超過int類型的最大值了,JDK的處理是賦值成Integer的MAX_VALUE為2147483647,也就是最大的隊列長度是2 G多,如果一個對象的大小按照50個字節(jié)來算,將會占用100 G的內(nèi)存必定溢出。

        5 模擬內(nèi)存溢出代碼測試

        當業(yè)務高峰給服務器帶來大量請求,大量的超時會導致重試隊列迅速堆積,直到內(nèi)存溢出,下面就通過代碼來測試一下:模擬大量的添加任務,并且任務在調(diào)度隊列中堆積,推遲一天執(zhí)行。

        while(true){

        taskService.schedule(new Runnable(){public void run(){}},1,TimeUnit.DAYS);

        }

        虛擬機啟動參數(shù)-:

        -Xms32M -Xmx32M -Xmn10M-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=d:/

        運行輸出:

        java.lang.OutOfMemoryError:Java heap space Dumping head to d:/\java_pid12884.hprof...

        Heap dump file created [44940425 bytes in 0.618 secs]

        內(nèi)存溢出了,來看看內(nèi)存快照(見表1)。

        6 解決方案及措施

        編譯好的java程序需要運行在JVM中,而JVM為程序提供并管理所需要的內(nèi)存空間,JVM自帶一個用于回收沒有任何引用指向的對象的線程機制(垃圾回收器),但針對于ScheduledThreadPoolExecutor提交的任務會被封裝成ScheduledFutureTask類型對象且每個對象中又有Sync成員變量。解決的辦法可以是手動判斷隊列的大小,通過taskService.getQueue().size()方法,通過Jmap內(nèi)存分析工具估算每個對象的大小,Jmap是一個可以輸出所有內(nèi)存中對象的工具,甚至可以將JVM 中的heap,以二進制輸出成文本。打印出某個Java進程內(nèi)存內(nèi)的所有‘對象的情況,結合能夠為隊列分配的內(nèi)存大小,計算出隊列容納任務的最大數(shù)量,以避免內(nèi)存溢出。

        參考文獻

        [1] 逯昌浩.淺析多核處理器條件下的Java編程[J].中國科技信息,2009(12):128,130.

        [2] 張復興,曾新洲.擴展線程池模型及性能分析[J].計算技術與自動化,2007(4):110-112.

        [3] (美)Bruce Eckel.Java編程思想[M].陳昊鵬,譯.北京:機械工業(yè)出版社,2007.

        猜你喜歡
        入隊線程調(diào)用
        今天我入隊——入隊儀式
        少先隊活動(2022年5期)2022-06-06 03:45:02
        1+1我們這樣學隊章:我們的入隊誓詞
        少先隊活動(2020年7期)2020-08-14 01:17:50
        核電項目物項調(diào)用管理的應用研究
        LabWindows/CVI下基于ActiveX技術的Excel調(diào)用
        測控技術(2018年5期)2018-12-09 09:04:46
        今天我入隊了
        入隊風波
        淺談linux多線程協(xié)作
        基于系統(tǒng)調(diào)用的惡意軟件檢測技術研究
        利用RFC技術實現(xiàn)SAP系統(tǒng)接口通信
        Linux線程實現(xiàn)技術研究
        国产无套一区二区三区久久| 欧美精品久久久久久久自慰| 蜜桃日本免费观看mv| 蜜臀av性久久久久蜜臀aⅴ| 天美麻花果冻视频大全英文版| 青春草国产视频| 2021年国产精品每日更新| aⅴ色综合久久天堂av色综合| 亚洲国语对白在线观看| 国产日本精品视频一区二区| 国产精品视频亚洲二区| 免费视频爱爱太爽了| 躁躁躁日日躁| 亚洲日本无码一区二区在线观看| 国产一区二区在线观看av| 国产一区二区三区再现| 亚洲av熟女少妇一区二区三区| 精品嫩模福利一区二区蜜臀| 欧美怡春院一区二区三区| 又色又爽又黄的视频软件app | 国产美熟女乱又伦av果冻传媒| 亚洲美女性生活一级片| 亚洲国产精品自拍成人| 精品国产天堂综合一区在线| 精品国产一区二区三区免费| 四虎影视久久久免费| yy111111少妇影院| 在线观看视频亚洲一区二区三区| 国产无套中出学生姝| 少妇无码太爽了不卡视频在线看| 国内久久婷婷精品人双人| 国内精品久久久久影院蜜芽| 免费观看在线视频播放| 亚洲丁香婷婷久久一区二区| 日韩吃奶摸下aa片免费观看| 国产精品毛片无码| 国产精品无码mv在线观看| 一级黄色一区二区三区视频| 蜜桃传媒免费观看视频| 精品无人区无码乱码毛片国产| 内射人妻视频国内|