大家好,欢迎来到IT知识分享网。
直接先看代码。
package resourse.treadPool;
/** * 线程池定义接口 */
public interface ThreadPool<Job extends Runnable> {
/** 执行一个job*/
void execute(Job job);
/** 关闭线程池*/
void shutdown();
/**增加工作者线程*/
void addWorkers(int num);
/**减少工作者线程*/
void removeWorker(int num);
/** 得到正在等待执行的任务数量*/
int getJobSize();
}
package resourse.treadPool;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/** * 线程池简单实现 * @param <Job> */
public class DefaultTreadPool<Job extends Runnable> implements ThreadPool<Job> {
//线程池最大限制数
private static final int MAX_WORKER_NUMBERS = 10;
//线程池默认数量
private static final int DEFAULT_WORKER_NUMBERS = 5;
//线程池最少数量
private static final int MIN_WORKER_NUMBERS = 1;
//工作列表
private LinkedList<Job> jobs = new LinkedList<Job>();
//工作者列表
private List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
//工作者线程的数量
private int workerNum = DEFAULT_WORKER_NUMBERS;
//线程编号生成
private AtomicLong threadNum = new AtomicLong();
public DefaultTreadPool(){
initializeWokers(DEFAULT_WORKER_NUMBERS);
}
private void DefaultTreadPool(int num) {
workerNum = num>MAX_WORKER_NUMBERS?MAX_WORKER_NUMBERS:num<MIN_WORKER_NUMBERS?MIN_WORKER_NUMBERS:num;
initializeWokers(num);
}
/** * 初始化线程工作者 * @param defaultWorkerNumbers */
private void initializeWokers(int num) {
for (int i = 0; i < num; i++) {
Worker worker = new Worker();
workers.add(worker);
Thread thread = new Thread(worker, "thread-worker-"+threadNum.incrementAndGet());
thread.start();
}
}
@Override
public void execute(Job job) {
if(job != null){
//添加一个工作到工作队列,然后通知一个工作者
jobs.addLast(job);
jobs.notify();//只通知一个就够了
}
}
@Override
public void shutdown() {
for(Worker worker : workers){
worker.shutdown();
}
}
@Override
public void addWorkers(int num) {
synchronized (jobs) {
//工作者数量不能超过最大值
if(num+this.workerNum > MAX_WORKER_NUMBERS){
num = MAX_WORKER_NUMBERS - this.workerNum;
}
initializeWokers(num);
this.workerNum = workerNum + num;
}
}
@Override
public void removeWorker(int num) {
synchronized (jobs) {
if(num > this.workerNum){
}
//关闭工作者
int count = 0;
while(count < num){
Worker worker = workers.get(count);
if(workers.remove(worker)){
worker.shutdown();
count++;
}
}
this.workerNum = this.workerNum - count;
}
}
@Override
public int getJobSize() {
return jobs.size();
}
class Worker implements Runnable {
//是否工作
private volatile boolean running = true;
@Override
public void run() {
while(running){
Job job = null;
synchronized (jobs) {
//如果工作列表为空,则等待
while(jobs.isEmpty()){
try {
jobs.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//取出一个job
job = jobs.removeFirst();
}
//执行任务
if(job != null){
job.run();
}
}
}
/** * 关闭工作者线程 */
public void shutdown(){
running = false;
}
}
}
源代码很简单,核心就是工作链表jobs,具体如下面列出的三点,其他的操作都是围绕次功能做的一些补充。
1、调用线程池执行任务的时候仅仅只是把任务放入到任务列表当中,然后唤醒被jobs对象锁阻塞的所有线程。
2、所有的synchronized,取的都是jobs的对象锁
3、工作者执行任务的流程依次是 获取jobs对象锁—轮询jobs链表,如果为空则jobs.wait(),否则取出jobs里面的第一个任务,然后释放jobs对象锁并唤醒所有阻塞的工作者。最后处理任务。
线程池的实现方法有很多种,但是核心思想都是想通的。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/14252.html