🗒️线程池
2024-10-31
| 2024-10-31
0  |  阅读时长 0 分钟
type
status
date
slug
summary
tags
category
icon
password
创建时间
Oct 31, 2024 01:40 PM

ThreadPoolExecutor

线程池状态

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量
状态名
高3位
接收新任务
处理阻塞队列任务
说明
RUNNING
111
Y
Y
SHUTDOWN
000
N
Y
不会接收新任务,但会处理阻塞队列剩余任务
STOP
001
N
N
会中断正在执行的任务,并抛弃阻塞队列任务
TIDYING
010
任务全执行完毕,活动线程为0即将进入终结
TERMINATED
011
终结状态
从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING 这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值

构造方法──7 个参数

notion image
  • corePoolSize:核心线程数,即线程池中始终保持的线程数量。
  • maximumPoolSize:最大线程数,即线程池中允许的最大线程数量。
  • keepAliveTime:线程空闲时间(生存时间),超过这个时间的非核心线程会被销毁。──针对救急线程
  • unit:时间单位──针对救急线程
  • workQueue:任务队列(阻塞队列),存放待执行的任务。
  • threadFactory:线程工厂,用于创建新线程。
  • handler:拒绝策略
工作方式:
  1. 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
  1. 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排队,直到有空闲的线程。工作队列类型如下:
      • SynchronousQueue:不存储任务,直接将任务提交给线程。
      • LinkedBlockingQueue:链表结构的阻塞队列,大小无限
      • ArrayBlockingQueue:数组结构的有界阻塞队列。
      • PriorityBlockingQueue:带优先级的无界阻塞队列。
  1. 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。──救急线程出现的前提是有界队列
  1. 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现,其它著名框架也提供了实现
      • AbortPolicy:这是默认的拒绝策略。当任务被拒绝时,它会抛出RejectedExecutionException异常。适用于必须通知调用者任务未能被执行的场景。
      • CallerRunsPolicy:这个策略不会抛出异常,也不会丢弃任务。相反,它会将任务回退到调用者线程中,由调用者线程来执行。适用于希望通过减缓任务提交速度来稳定系统的场景。
      • DiscardOldestPolicy:这个策略会尝试丢弃队列中最老的任务,然后重新尝试提交新任务(如果可能的话)。适用于希望丢弃最旧的任务以保证新的重要任务能被重新处理的任务
      • DiscardPolicy:这个策略默默地丢弃被拒绝的任务,不做任何处理。适用于对部分任务丢弃没有影响的场景,或系统负载较高时,不需要处理所有任务。
  1. 当高峰过去后,超过 corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTimeunit 来控制。

线程池实现

FixedThreadPool :通过 newFixedThreadPool 来创建
特点
  • 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
  • 阻塞队列是无界的,可以放任意数量的任务──可能会 OOM
  • 适用于任务量已知,相对耗时的任务。适合任务数量相对固定,且需要限制线程数的场景,避免线程过多占用系统资源。
CachedThreadPool :通过 newCachedThreadPool 来创建
特点
  • 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着
    • 全部都是救急线程(60s 后可以回收)
    • 救急线程可以无限创建
  • 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)
  • 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。 适合任务数比较密集,但每个任务执行时间较短的情况。更适合大量短期任务或任务数量不确定的场景,能够根据任务量动态调整线程数。
SingleThreadExecutor :通过 newSingleThreadExecutor 来创建
特点
  • 区别
    • 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一 个线程,保证池的正常工作
    • Executors.newSingleThreadExecutor() 线程个数始终为 1,不能修改:FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法
    • Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改:对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改
  • 希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。
ScheduledThreadPool:通过 newScheduledThreadPool 来创建
scheduleAtFixedRate 以固定速率执行任务!
scheduleWithFixedDelay 每个任务之间的间隔时间!从结束时间来算的!
WorkStealingPool:通过 newWorkStealingPool 来创建
底层实现:实际上,newworkStealingPool() 返回的是一个基于 ForkJoinPool 的实例,因此它本质上是利用了 Fork/Join 框架下的工作窃取机制。
适合处理大量的小任务,能更好地利用CPU资源。
以上都是使用 Executors 来创建。
ForkJoinPool :通过 ForkJoinPool 来创建
Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型运算。所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列、都可以用分治思想进行求解。Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了 算效率。 Fork/Join 默认会创建与 cpu 核心数大小相同的线程池
ForkJoinPool 相比于 ThreadPoolExecutor,还有一个非常重要的特点(优点)在于,ForkJoinPool具有 Work-Stealing (工作窃取)的能力。所谓 Work-Stealing,在 ForkJoinPool 中的实现为:线程池中每个线程都有一个互不影响的任务队列(双端队列),线程每次都从自己的任务队列的队头中取出一个任务来运行;如果某个线程对应的队列已空并且处于空闲状态,而其他线程的队列中还有任务需要处理但是该线程处于工作状态,那么空闲的线程可以从其他线程的队列的队尾取一个任务来帮忙运行 —— 感觉就像是空闲的线程去偷人家的任务来运行一样,所以叫 “工作窃取”。
Work-Stealing 的适用场景是不同的任务的耗时相差比较大,即某些任务需要运行较长时间,而某些任务会很快的运行完成,这种情况下用 Work-Stealing 很合适;但是如果任务的耗时很平均,则此时 Work-Stealing 并不适合,因为窃取任务时不同线程需要抢占锁,这可能会造成额外的时间消耗,而且每个线程维护双端队列也会造成更大的内存消耗。所以 ForkJoinPool 并不是 ThreadPoolExecutor 的替代品,而是作为对 ThreadPoolExecutor 的补充。
ForkJoinPool 和 ThreadPoolExecutor 都是 ExecutorService(线程池),但ForkJoinPool 的独特点在于:
  1. ThreadPoolExecutor 只能执行 Runnable 和 Callable 任务,而 ForkJoinPool 不仅可以执行 Runnable 和 Callable 任务,还可以执行 Fork/Join 型任务 —— ForkJoinTask —— 从而满足并行地实现分治算法的需要;
  1. ThreadPoolExecutor 中任务的执行顺序是按照其在共享队列中的顺序来执行的,所以后面的任务需要等待前面任务执行完毕后才能执行,而 ForkJoinPool 每个线程有自己的任务队列,并在此基础上实现了 Work-Stealing 的功能,使得在某些情况下 ForkJoinPool 能更大程度的提高并发效率。

将任务交给线程池执行──提交任务

execute和submit的区别

  • execute只能提交Runnable类型的任务,无返回值。submit既可以提交Runnable类型的任务,也可以提交Callable类型的任务,会有一个类型为Future的返回值,但当任务类型为Runnable时,返回值为null
  • execute在执行任务时,如果遇到异常会直接抛出,而submit不会直接抛出,只有在使用Futureget方法获取返回值时,才会抛出异常。

关闭线程池

  • shutdown:启动线程池的平滑关闭。它不再接受新的任务,但会继续执行已经提交的任务(包括在队列中的任务)。线程池会进入SHUTDOWN状态,所有已执行和正在执行的任务都会继续完成,只有所有任务完成后,线程池才会完全终止。
  • shutdownNow:启动线程池的强制关闭。它会尝试停止所有正在执行的任务,并返回等待执行的任务列表。它会尽力中断正在执行的任务,但不能保证所有任务都能被立即停止。线程池进入 STOP 状态,除了尝试中断正在执行的任务外,还会清空任务队列,返回未执行的任务列表。

捕捉异常

关于抛异常,来自京东技术
  • 当执行方式是execute时,可以看到堆栈异常的输出,线程池会把这个线程移除掉,并创建一个新的线程放到线程池中。
  • 当执行方式是submit时,堆栈异常没有输出。但是调用Future.get()方法时,可以捕获到异常,不会把这个线程移除掉,也不会创建新的线程放入到线程池中。
默认情况下,线程池不会直接报告哪个线程发生了异常,但是可以采取以下几种方法:
  • 自定义线程池的 ThreadFactory : 通过自定义 ThreadFactory,为每个线程设置一个异常处理器(UncaughtExceptionHandler ),在其中记录发生异常的线程信息。
  • 使用 Future: 提交任务时使用 submit() 方法,而不是 execute(),这样可以通过 Future 对象捕获并检查任务的执行结果和异常。
  • 任务内部手动捕获异常并记录: 在任务的 run() 方法内部,使用 try-catch 结构捕获异常,并记录或处理异常,同时记录线程信息。

📎 参考

 
基于数组实现一个大顶堆,并提供添加元素和删除堆顶元素的操作Daily Note 笔记法实践
Loading...