测试demo, ThreadPoolExecutorTest:
public class ThreadPoolExecutorTest { public static void main(String[] args) throws InterruptedException { final boolean isFair = false; ArrayBlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<Runnable>(10, isFair); // arrayBlockingQueue.add(new MyThreadTask(10086)); final int corePoolSize = 3; final int maximumPoolSize = 6; ThreadPoolExecutor threadPool = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, 1, TimeUnit.SECONDS, arrayBlockingQueue, new ThreadPoolExecutor.CallerRunsPolicy()); // threadPool.allowCoreThreadTimeOut(true); // Integer result = 9; for (int index = 1; index <= 10; index++) { Thread tempNewThread = new MyThreadTask(index); threadPool.execute(tempNewThread); // result = threadPool.submit(new MyThreadTask(i), result); } // threadPool.shutdown(); } }
ThreadPoolExecutor 抽出来的一些核心方法:
public class ThreadPoolExecutor extends AbstractExecutorService { private final BlockingQueue<Runnable> workQueue; private final ReentrantLock mainLock = new ReentrantLock(); private final HashSet<Worker> workers = new HashSet<Worker>(); /*** * 线程中真正执行的Worker线程 */ private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /*** * 代理, 执行上层的runWorker方法; */ public void run() { runWorker(this); } } /*** * 把firstTask 添加到核心线程, 并启动; * @param firstTask * @param core 是否是核心线程 * @return */ private boolean addWorker(Runnable firstTask, boolean core) { boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (!workerStarted) addWorkerFailed(w); } return workerStarted; } /*** * 从 workQueue 的线程等待队列中获取线程(后面准备执行); * @return */ private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } /*** * 运行Worker线程; * * while (task != null || (task = getTask()) != null) * 第一次判断是当前的核心线程; * 第二个判断是核心线程第一次执行完毕, 则从workQueue中获取线程继续执行; * * task.run(); * 直接调用的run方法(外层已经有worker的线程包装起的) */ final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } /*** * 执行, 线程池执行线程的总入口 * @param command */ public void execute(Runnable command) { int c = ctl.get(); // 核心线程执行 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 核心已经多了, 添加到 workQueue if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } /** * 其中一个拒绝策略 */ public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } }
相关推荐
死磕ThreadPoolExecutor线程池.pdf!!死磕ThreadPoolExecutor线程池.pdf死磕ThreadPoolExecutor线程池.pdf死磕ThreadPoolExecutor线程池.pdf
线程池ThreadPoolExecutor的源码分析,含中文注释,深入了解线程池的构造
线程池原理-ThreadPoolExecutor源码解析 1.构造方法及参数 2.阻塞对列: BlockingQueue 3.线程工厂: DefaultThreadFactory 4.拒绝策略: RejectedExecutionHandler 5.执行线程 Executor
ThreadPoolExecutor线程池
11-线程池 ThreadPoolExecutor 底层原理源码分析(上)-周瑜.pdf 12-线程池 ThreadPoolExecutor底层原理源码分析(下)-周瑜.pdf 13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、...
11-线程池 ThreadPoolExecutor 底层原理源码分析(上)-周瑜.pdf 12-线程池 ThreadPoolExecutor底层原理源码分析(下)-周瑜.pdf 13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、...
线程是系统中可执行调度的...线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,规避资源耗尽的风险。集合的详细描述,以及集合中的异同点,HashMap不同jdk版本区别,ConcurrentHashMap介绍。
主要为大家详细介绍了ThreadPoolExecutor线程池的使用方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
Java,线程池,ThreadPoolExecutor
线程池ThreadPoolExecutor底层原理源码分析
线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor...
主要介绍了解决python ThreadPoolExecutor 线程池中的异常捕获问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
java线程池源码 cThreadPool 项目描述:对java.util.concurrent包下线程池相关源码进行重新实现,深入研究和学习线程池超时机制、饱和策略、生命周期等知识 ThreadPoolExecutor类下部分方法和内部类介绍: 1、Worker...
提供工厂方法来创建不同类型的线程池,这篇文章主要介绍了Java ThreadPoolExecutor 线程池的使用介绍,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来...
ThreadPoolExecutor源码解析.md
下面小编就为大家带来一篇简单谈谈ThreadPoolExecutor线程池之submit方法。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
而线程池不允许使用Executors去创建,而要通过ThreadPoolExecutor方式,这一方面是由于jdk中Executor框架虽然提供了如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等创建线程池的方法,...
ThreadPoolExecutor线程池源码