粘贴复制可以直接使用
把文件添加到项目中去
模仿线程池工厂封装
import java.util.Collection; import java.util.Vector; public class PooledThread extends Thread { protected Vector tasks = new Vector(); protected boolean running = false; protected boolean stopped = false; protected boolean paused = false; protected boolean killed = false; private ThreadPool pool; public PooledThread(ThreadPool pool){ this.pool = pool; } public void putTask(ThreadTask task){ tasks.add(task); } public void putTasks(ThreadTask[] tasks){ for(int i=0; i<tasks.length; i++) this.tasks.add(tasks[i]); } public void putTasks(Collection tasks){ this.tasks.addAll(tasks); } protected ThreadTask popTask(){ if(tasks.size() > 0) return (ThreadTask)tasks.remove(0); else return null; } public boolean isRunning(){ return running; } public void stopTasks(){ stopped = true; } public void stopTasksSync(){ stopTasks(); while(isRunning()){ try { sleep(5); } catch (InterruptedException e) { } } } public void pauseTasks(){ paused = true; } public void pauseTasksSync(){ pauseTasks(); while(isRunning()){ try { sleep(5); } catch (InterruptedException e) { } } } public void kill(){ if(!running) interrupt(); else killed = true; } public void killSync(){ kill(); while(isAlive()){ try { sleep(5); } catch (InterruptedException e) { } } } public synchronized void startTasks(){ running = true; this.notify(); } public synchronized void run(){ try{ while(true){ if(!running || tasks.size() == 0){ pool.notifyForIdleThread(); //System.out.println(Thread.currentThread().getId() + ": 空闲"); this.wait(); }else{ ThreadTask task; while((task = popTask()) != null){ task.run(); if(stopped){ stopped = false; if(tasks.size() > 0){ tasks.clear(); System.out.println(Thread.currentThread().getId() + ": Tasks are stopped"); break; } } if(paused){ paused = false; if(tasks.size() > 0){ System.out.println(Thread.currentThread().getId() + ": Tasks are paused"); break; } } } running = false; } if(killed){ killed = false; break; } } }catch(InterruptedException e){ return; } //System.out.println(Thread.currentThread().getId() + ": Killed"); } }
线程管理器
import com.nkr.ndp.manager.mqtt.LogHelper; import java.util.Collection; import java.util.Iterator; import java.util.Vector; public class ThreadPool { protected int maxPoolSize; protected int initPoolSize; protected Vector threads = new Vector(); protected boolean initialized = false; protected boolean hasIdleThread = false; public ThreadPool(int maxPoolSize, int initPoolSize){ this.maxPoolSize = maxPoolSize; this.initPoolSize = initPoolSize; } public void init(){ LogHelper.Info("线程池开始初始化"); initialized = true; for(int i=0; i<initPoolSize; i++){ PooledThread thread = new PooledThread(this); thread.start(); threads.add(thread); } LogHelper.Info("线程池初始化结束,线程数=" + threads.size() + " 最大线程数=" + maxPoolSize); } public void setMaxPoolSize(int maxPoolSize){ //System.out.println("重设最大线程数,最大线程数=" + maxPoolSize); this.maxPoolSize = maxPoolSize; if(maxPoolSize < getPoolSize()) setPoolSize(maxPoolSize); } /** * 重设当前线程数 * 若需杀掉某线程,线程不会立刻杀掉,而会等到线程中的事务处理完成 * 但此方***立刻从线程池中移除该线程,不会等待事务处理结束 * @param size */ public void setPoolSize(int size){ if(!initialized){ initPoolSize = size; return; }else if(size > getPoolSize()){ for(int i=getPoolSize(); i<size && i<maxPoolSize; i++){ PooledThread thread = new PooledThread(this); thread.start(); threads.add(thread); } }else if(size < getPoolSize()){ while(getPoolSize() > size){ PooledThread th = (PooledThread)threads.remove(0); th.kill(); } } //System.out.println("重设线程数,线程数=" + threads.size()); } public int getPoolSize(){ return threads.size(); } protected void notifyForIdleThread(){ hasIdleThread = true; } protected boolean waitForIdleThread(){ hasIdleThread = false; while(!hasIdleThread && getPoolSize() >= maxPoolSize){ try { Thread.sleep(5); } catch (InterruptedException e) { return false; } } return true; } public synchronized PooledThread getIdleThread(){ while(true){ for(Iterator itr=threads.iterator(); itr.hasNext();){ PooledThread th = (PooledThread)itr.next(); if(!th.isRunning()) return th; } if(getPoolSize() < maxPoolSize){ PooledThread thread = new PooledThread(this); thread.start(); threads.add(thread); return thread; } LogHelper.Info("线程池已满,等待..."); if(waitForIdleThread() == false) return null; } } public void processTask(ThreadTask task){ PooledThread th = getIdleThread(); if(th != null){ th.putTask(task); th.startTasks(); } } public void processTasksInSingleThread(ThreadTask[] tasks){ PooledThread th = getIdleThread(); if(th != null){ th.putTasks(tasks); th.startTasks(); } } public void processTasksInSingleThread(Collection tasks){ PooledThread th = getIdleThread(); if(th != null){ th.putTasks(tasks); th.startTasks(); } } }
重写实现线程方法
public interface ThreadTask { public void run(); }
如何使用
//初始化线程池 ThreadPool pool = new ThreadPool(2, 1); //实体类继承本文的线程管理器 public class ImsisExcel implements ThreadTask { public String taskId; public List<ImsiExcel> list; @Override public void run() { //重写的就是要调用的线程方法 exportExcel(this.taskId,this.list); } } //实现 ImsisExcel excel = new ImsisExcel(); excel.taskId = IdUtil.fastSimpleUUID(); excel.list=list; ThreadTask task = excel; pool.processTask(task);