简单线程池实现原理

简单线程池实现原理直接先看代码。packageresourse.treadPool;/***线程池定义接口*/publicinterfaceThreadPoolJobextendsRunnable{/**执行一个job*/voidexecute(Jobjob);/**关闭线程池*/voidshutdown();/**增加工作者线程*

大家好,欢迎来到IT知识分享网。简单线程池实现原理"

直接先看代码。
package resourse.treadPool;
/** * 线程池定义接口 */
public interface ThreadPool<Job extends Runnable> {

    /** 执行一个job*/
    void execute(Job job);

    /** 关闭线程池*/
    void shutdown();

    /**增加工作者线程*/
    void addWorkers(int num);

    /**减少工作者线程*/
    void removeWorker(int num);

    /** 得到正在等待执行的任务数量*/
    int getJobSize();
}

package resourse.treadPool;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/** * 线程池简单实现 * @param <Job> */
public class DefaultTreadPool<Job extends Runnable> implements ThreadPool<Job> { 
   

    //线程池最大限制数
    private static final int MAX_WORKER_NUMBERS = 10;
    //线程池默认数量
    private static final int DEFAULT_WORKER_NUMBERS = 5;
    //线程池最少数量
    private static final int MIN_WORKER_NUMBERS = 1;

    //工作列表
    private LinkedList<Job> jobs = new LinkedList<Job>();
    //工作者列表
    private List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
    //工作者线程的数量
    private int workerNum = DEFAULT_WORKER_NUMBERS;
    //线程编号生成
    private AtomicLong threadNum = new AtomicLong();

    public DefaultTreadPool(){
        initializeWokers(DEFAULT_WORKER_NUMBERS);
    }

    private void DefaultTreadPool(int num) {
        workerNum = num>MAX_WORKER_NUMBERS?MAX_WORKER_NUMBERS:num<MIN_WORKER_NUMBERS?MIN_WORKER_NUMBERS:num;
        initializeWokers(num);
    }

    /** * 初始化线程工作者 * @param defaultWorkerNumbers */
    private void initializeWokers(int num) {
        for (int i = 0; i < num; i++) {
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread = new Thread(worker, "thread-worker-"+threadNum.incrementAndGet());
            thread.start();
        }

    }

    @Override
    public void execute(Job job) {
        if(job != null){
            //添加一个工作到工作队列,然后通知一个工作者
            jobs.addLast(job);
            jobs.notify();//只通知一个就够了
        }
    }

    @Override
    public void shutdown() {
        for(Worker worker : workers){
            worker.shutdown();
        }
    }

    @Override
    public void addWorkers(int num) {
        synchronized (jobs) {
            //工作者数量不能超过最大值
            if(num+this.workerNum > MAX_WORKER_NUMBERS){
                num = MAX_WORKER_NUMBERS - this.workerNum;
            }

            initializeWokers(num);
            this.workerNum = workerNum + num;
        }

    }

    @Override
    public void removeWorker(int num) {
        synchronized (jobs) {
            if(num > this.workerNum){

            }

            //关闭工作者
            int count = 0;
            while(count < num){
                Worker worker = workers.get(count);
                if(workers.remove(worker)){
                    worker.shutdown();
                    count++;
                }
            }

            this.workerNum = this.workerNum - count;
        }
    }

    @Override
    public int getJobSize() {
        return jobs.size();
    }

    class Worker implements Runnable { 
   
        //是否工作
        private volatile boolean running = true;

        @Override
        public void run() {
            while(running){
                Job job = null;
                synchronized (jobs) {
                    //如果工作列表为空,则等待
                    while(jobs.isEmpty()){
                        try {
                            jobs.wait();
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                    //取出一个job
                    job = jobs.removeFirst();
                }

                //执行任务
                if(job != null){
                    job.run();
                }
            }

        }

        /** * 关闭工作者线程 */
        public void shutdown(){
            running = false;
        }
    }
}

源代码很简单,核心就是工作链表jobs,具体如下面列出的三点,其他的操作都是围绕次功能做的一些补充。
1、调用线程池执行任务的时候仅仅只是把任务放入到任务列表当中,然后唤醒被jobs对象锁阻塞的所有线程。
2、所有的synchronized,取的都是jobs的对象锁
3、工作者执行任务的流程依次是 获取jobs对象锁—轮询jobs链表,如果为空则jobs.wait(),否则取出jobs里面的第一个任务,然后释放jobs对象锁并唤醒所有阻塞的工作者。最后处理任务。

线程池的实现方法有很多种,但是核心思想都是想通的。

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/14252.html

(0)

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

关注微信