Java中的ThreadPoolExecutor線程池

摘要:今天簡單來和大家一起學習一下,java中的ThreadPoolExecutor線程池。線程池簡介背書中,線程池是一個并發框架,在初始化一個多線程應用程序過程中創立一個線程集合,而后在需要執行新的任務時重用這些線程而不是新建一個線程。正當使用線程池有以下幾個好處1.線程池可以利用已經創立好的線程重復執

今天簡單來和大家一起學習一下,java中的ThreadPoolExecutor線程池。

線程池簡介

背書中,線程池是一個并發框架,在初始化一個多線程應用程序過程中創立一個線程集合,而后在需要執行新的任務時重用這些線程而不是新建一個線程。

正當使用線程池有以下幾個好處

1.線程池可以利用已經創立好的線程重復執行任務,避免了線程創立、銷毀帶來的系統資源上的開支。所以線程池減少了系統資源消耗
2.有任務時,線程池可以利用空閑的線程去執行任務,提高了系統的響應速率
3.使用線程可以提高系統的并發解決能力,但是不正當的創立、使用線程會嚴重損耗系統的資源,線程的上下文環境切換也會帶來系統資源上的開支,使用線程池可以對線程進行統一的分配、調優、監控

線程池的實現原理

前面我已經在《并行執行任務的Fork/Join框架》一文中,給大家詳情了ForkJoinPool線程池,今天就以ThreadPoolExecutor為我們的豬腳吧,先舉個栗子,而后在根據ThreadPoolExecutor使用,和大家一起分析源碼。

舉個栗子

/** * @Description: . * @Author: ZhaoWeiNan . * @CreatedTime: 2017/8/20 . * @Version: 1.0 . */public class Demo {    /**     * 初始化線程池     */    private ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 10, 20, TimeUnit.SECONDS, new ArrayBlockingQueue(10),            new ThreadPoolExecutor.DiscardOldestPolicy());    public static void main(String[] args) {        Demo demo = new Demo();        for (int i = 0; i < 20; i++) {            final int time = i;            //把線程增加到線程池中            demo.threadPool.execute(new Runnable() {                @Override                public void run() {                    System.out.println("任務索引是:" + time);                    try {                        Thread.sleep(2000);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                }            });        }    }}
1.構造函數

通過構造函數來看看ThreadPoolExecutor的成員變量。

/**     * 創立新線程的工廠。     *     */    private volatile ThreadFactory threadFactory;    /**     * 當線程池中線程數量或者者線程池中止時被調用的解決類.     */    private volatile RejectedExecutionHandler handler;    /**     * 空閑線程等待工作的超時時間     */    private volatile long keepAliveTime;    /**     * 默認為false: 核心線程即便在空閑的時候也保持存活     *       true: 核心線程使用keepAliveTime去等待工作     */    private volatile boolean allowCoreThreadTimeOut;    /**     * 核心線程池的大小,保持存活的工作線程的最小數量。     */    private volatile int corePoolSize;    /**     * 線程池的最大容量     */    private volatile int maximumPoolSize;    /**     * 線程池中的任務隊列,是用了阻塞隊列來實現(有機會為大家詳情)     */    private final BlockingQueue<Runnable> workQueue;    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              ThreadFactory threadFactory,                              RejectedExecutionHandler handler) {        if (corePoolSize < 0 ||                maximumPoolSize <= 0 ||                maximumPoolSize < corePoolSize ||                keepAliveTime < 0)            throw new IllegalArgumentException();        if (workQueue == null || threadFactory == null || handler == null)            throw new NullPointerException();        this.corePoolSize = corePoolSize;        this.maximumPoolSize = maximumPoolSize;        this.workQueue = workQueue;        this.keepAliveTime = unit.toNanos(keepAliveTime);        this.threadFactory = threadFactory;        this.handler = handler;    }
2.看看把線程加入線程池的execute方法
 public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        /**         * 按3個步驟走:         */        int c = ctl.get();        /**         * 1.假如少于核心線程池大小corePoolSize的線程處于RUNNING狀態,         * 開一個新的線程,把command對象作為該線程的第一個任務。         * 調用addWorker方法,原子性操作判斷command的狀態和worker的數量         * 防止在線程池不能增加線程的時候增加線程。         */        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;            c = ctl.get();        }        /**         * 2.假如一個任務成功加入到阻塞隊列中,增加一個線程,還要進行第二次檢查。         * 由于一已經存在的線程可以能已經死亡,或者者這個線程池已經關閉當要增加線程時。         * 所以我們進行了第二次檢查狀態,進行必要的入隊回滾,或者者池中沒有線程時去         * 創立一個新的線程。         */        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))                reject(command);            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }        /**         * 3.假如任務不能入隊,我們嘗試創立一個新的線程,假如創立線程失敗可,         * 我們知道了,線程池現在是關閉狀態或者者是飽和狀態,所以我們拒絕了這個任務。         */        else if (!addWorker(command, false))            reject(command);    }
3.看看addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) {        retry:        for (;;) {            //獲取狀態            int c = ctl.get();            int rs = runStateOf(c);            // Check if queue empty only if necessary.            // 檢查狀態,檢查隊列和任務能否為空            if (rs >= SHUTDOWN &&                    ! (rs == SHUTDOWN &&                            firstTask == null &&                            ! workQueue.isEmpty()))                return false;            for (;;) {                //獲取worker的數量                int wc = workerCountOf(c);                //判斷worker的數量                if (wc >= CAPACITY ||                        wc >= (core ? corePoolSize : maximumPoolSize))                    return false;                if (compareAndIncrementWorkerCount(c))                    break retry;                //重新獲取一次狀態                c = ctl.get();                if (runStateOf(c) != rs)                    continue retry;                //因為worker數量改變導致CAS(原子操作)失敗,重新進入內部循環            }        }        boolean workerStarted = false;        boolean workerAdded = false;        //頂一個一個worker對象        Worker w = null;        try {            //創立一個可重入鎖            final ReentrantLock mainLock = this.mainLock;            //利用傳入的任務對象,初始化worker對象            w = new Worker(firstTask);            //創立一個對象            final Thread t = w.thread;            if (t != null) {                //加鎖                mainLock.lock();                try {                    // 在獲取到鎖的情況下,再次檢查狀態                    int c = ctl.get();                    int rs = runStateOf(c);                    if (rs < SHUTDOWN ||                            (rs == SHUTDOWN && firstTask == null)) {                        //先檢查線程t能否已經start                        if (t.isAlive())                            throw new IllegalThreadStateException();                        //加入到workers中                        //看一眼 workers                        // private final HashSet<Worker> workers = new HashSet<Worker>();                        // workers保存了所有已經獲取到了mainLock鎖的worker對象                        workers.add(w);                        int s = workers.size();                        if (s > largestPoolSize)                            largestPoolSize = s;                        workerAdded = true;                    }                } finally {                    mainLock.unlock();                }                if (workerAdded) {                    t.start();                    workerStarted = true;                }            }        } finally {            if (! workerStarted)                addWorkerFailed(w);        }        return workerStarted;    }
4.看看Worker這個類
worker

Worker是ThreadPoolExecutor中的一個內部類,實現了Runnable接口,是一個線程類,主要使用來,執行加入到線程池中的任務

線程池的監控

線程池有少量屬性,可以支持我們對線程池進行了少量監控,在出現問題的時候可以很方便的進行問題的定位:

    /**     * 線程池里曾經創立過的最大線程數,假如該數等于線程池的大小,證實該線程池曾經被打滿過     */    private int largestPoolSize;    /**     * 線程中已經完成的任務數量     */    private long completedTaskCount;    /**     * 獲取線程池中活動的線程數     */    public int getActiveCount() {        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            int n = 0;            for (ThreadPoolExecutor.Worker w : workers)                if (w.isLocked())                    ++n;            return n;        } finally {            mainLock.unlock();        }    }    /**     * 線程池的線程數量     */    public int getPoolSize() {        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            // Remove rare and surprising possibility of            // isTerminated() && getPoolSize() > 0            return runStateAtLeast(ctl.get(), TIDYING) ? 0                    : workers.size();        } finally {            mainLock.unlock();        }    }

另外,線程池中,beforeExecute、afterExecute、terminated方法,都是空方法,可以重寫這三個方法,在執行任務前、執行任務后、線程池關閉時執行代碼來進行線程池的監控,例如可以統計任務執行時間等

    protected void beforeExecute(Thread t, Runnable r) { }    protected void afterExecute(Runnable r, Throwable t) { }    protected void terminated() { }

正當配置線程池

線程池的怎樣配置,可能是一直困擾著我和大家的一個問題,具體正當配置我一直沒有實踐的做過試驗,都是使用了度娘到的大神給的配置建議

CPU密集型任務

CPU密集型任務盡量配置少的線程數,一般為CPU個數 + 1

IO密集型任務

IO密集型任務線程并不是一直在執行任務,線程數可以設置多一點,一般為CPU個數的兩倍

最后套用阿里巴巴Java開發手冊中的一句話

阿里開發規約

ThreadPoolExecutor就為大家簡單的說到這,歡迎大家來交流,指出文中少量說錯的地方,讓我加深認識,愿大家沒有bug,謝謝!

  • 全部評論(0)
最新發布的資訊信息
【系統環境|】學習web前端開發是正確的選擇(2019-09-15 16:55)
【系統環境|】干貨整理!零基礎html5網站開發學習步驟方法(保存不后悔)(2019-09-10 16:27)
【系統環境|】「前端入門」前端基本概念(2019-09-07 21:36)
【系統環境|】小白入門學習web前端,這些干貨不能少(2019-09-05 20:59)
【系統環境|】不是計算機專業,哪個專業更適合學習web前端(2019-09-03 20:31)
【系統環境|】入行web前端開發可以做什么工作(2019-09-02 20:51)
【系統環境|】什么是Web前端呢?為什么說web前端開發人員的薪資高、前景好呢?(2019-08-31 20:55)
【系統環境|】2019年Web前端開發的8個趨勢,你知道幾個?(2019-08-29 16:23)
【系統環境|】學習web前端,掌握這些,才有底氣跟面試官提薪資(2019-08-28 15:23)
【系統環境|】Web前端為什么那么好(2019-08-27 18:20)
手機二維碼手機訪問領取大禮包
返回頂部
双色球号码300期遗传走势图