054 《Folly CPUThreadPoolExecutor.h 权威指南:原理、实践与高级应用》
🌟🌟🌟本文案由Gemini 2.0 Flash Thinking Experimental 01-21创作,用来辅助学习知识。🌟🌟🌟
书籍大纲
▮▮▮▮ 1. chapter 1: 线程池(Thread Pool)与并发编程(Concurrent Programming)基础
▮▮▮▮▮▮▮ 1.1 并发(Concurrency)与并行(Parallelism)概念
▮▮▮▮▮▮▮ 1.2 为什么需要线程池(Why Thread Pool)?
▮▮▮▮▮▮▮ 1.3 线程池的优势与应用场景(Advantages and Application Scenarios of Thread Pool)
▮▮▮▮▮▮▮ 1.4 常见的线程池实现方案(Common Thread Pool Implementations)
▮▮▮▮▮▮▮▮▮▮▮ 1.4.1 标准库线程池(Standard Library Thread Pool)简介
▮▮▮▮▮▮▮▮▮▮▮ 1.4.2 folly
库概述(Overview of folly
Library)及其在并发编程中的作用
▮▮▮▮▮▮▮▮▮▮▮ 1.4.3 folly::CPUThreadPoolExecutor
的定位与特点(Positioning and Features of folly::CPUThreadPoolExecutor
)
▮▮▮▮ 2. chapter 2: folly::CPUThreadPoolExecutor
核心概念与基础使用
▮▮▮▮▮▮▮ 2.1 CPUThreadPoolExecutor
的设计哲学(Design Philosophy of CPUThreadPoolExecutor
)
▮▮▮▮▮▮▮ 2.2 CPUThreadPoolExecutor
的基本架构(Basic Architecture of CPUThreadPoolExecutor
)
▮▮▮▮▮▮▮ 2.3 核心组件详解(Detailed Explanation of Core Components)
▮▮▮▮▮▮▮▮▮▮▮ 2.3.1 工作线程(Worker Thread)
▮▮▮▮▮▮▮▮▮▮▮ 2.3.2 任务队列(Task Queue)
▮▮▮▮▮▮▮▮▮▮▮ 2.3.3 线程工厂(Thread Factory)
▮▮▮▮▮▮▮ 2.4 CPUThreadPoolExecutor
的创建与初始化(Creation and Initialization of CPUThreadPoolExecutor
)
▮▮▮▮▮▮▮ 2.5 任务提交与执行(Task Submission and Execution)
▮▮▮▮▮▮▮▮▮▮▮ 2.5.1 add()
方法:提交任务(Submit Task with add()
)
▮▮▮▮▮▮▮▮▮▮▮ 2.5.2 start()
方法:启动线程池(Start Thread Pool with start()
)
▮▮▮▮▮▮▮▮▮▮▮ 2.5.3 stop()
方法:停止线程池(Stop Thread Pool with stop()
)
▮▮▮▮▮▮▮ 2.6 基础代码示例:快速上手 CPUThreadPoolExecutor
(Basic Code Example: Getting Started with CPUThreadPoolExecutor
)
▮▮▮▮ 3. chapter 3: folly::CPUThreadPoolExecutor
进阶特性与配置
▮▮▮▮▮▮▮ 3.1 线程池大小动态调整(Dynamic Adjustment of Thread Pool Size)
▮▮▮▮▮▮▮ 3.2 任务优先级(Task Priority)管理
▮▮▮▮▮▮▮ 3.3 自定义任务队列(Custom Task Queue)
▮▮▮▮▮▮▮ 3.4 异常处理(Exception Handling)机制
▮▮▮▮▮▮▮ 3.5 线程池监控与指标(Thread Pool Monitoring and Metrics)
▮▮▮▮▮▮▮▮▮▮▮ 3.5.1 性能指标(Performance Metrics)
▮▮▮▮▮▮▮▮▮▮▮ 3.5.2 监控工具集成(Monitoring Tool Integration)
▮▮▮▮▮▮▮ 3.6 配置选项详解(Detailed Explanation of Configuration Options)
▮▮▮▮▮▮▮▮▮▮▮ 3.6.1 线程池参数配置(Thread Pool Parameter Configuration)
▮▮▮▮▮▮▮▮▮▮▮ 3.6.2 队列类型选择(Queue Type Selection)
▮▮▮▮ 4. chapter 4: folly::CPUThreadPoolExecutor
高级应用与实战
▮▮▮▮▮▮▮ 4.1 高并发场景下的 CPUThreadPoolExecutor
应用(CPUThreadPoolExecutor
Application in High Concurrency Scenarios)
▮▮▮▮▮▮▮ 4.2 结合 folly
库其他组件的应用(Application with Other Components of folly
Library)
▮▮▮▮▮▮▮ 4.3 异步编程(Asynchronous Programming)与 CPUThreadPoolExecutor
▮▮▮▮▮▮▮ 4.4 性能优化与调优(Performance Optimization and Tuning)
▮▮▮▮▮▮▮▮▮▮▮ 4.4.1 线程池大小选择策略(Thread Pool Size Selection Strategy)
▮▮▮▮▮▮▮▮▮▮▮ 4.4.2 任务调度优化(Task Scheduling Optimization)
▮▮▮▮▮▮▮ 4.5 案例分析:使用 CPUThreadPoolExecutor
构建高性能服务(Case Study: Building High-Performance Services with CPUThreadPoolExecutor
)
▮▮▮▮ 5. chapter 5: folly::CPUThreadPoolExecutor
API 全面解析
▮▮▮▮▮▮▮ 5.1 类结构与继承关系(Class Structure and Inheritance Relationship)
▮▮▮▮▮▮▮ 5.2 公有成员函数(Public Member Functions)详解
▮▮▮▮▮▮▮▮▮▮▮ 5.2.1 构造函数与析构函数(Constructor and Destructor)
▮▮▮▮▮▮▮▮▮▮▮ 5.2.2 任务管理相关函数(Task Management Related Functions)
▮▮▮▮▮▮▮▮▮▮▮ 5.2.3 线程池控制函数(Thread Pool Control Functions)
▮▮▮▮▮▮▮ 5.3 保护成员函数(Protected Member Functions)解析
▮▮▮▮▮▮▮ 5.4 示例代码与 API 使用场景(Example Code and API Usage Scenarios)
▮▮▮▮ 6. chapter 6: folly::CPUThreadPoolExecutor
与其他线程池的比较
▮▮▮▮▮▮▮ 6.1 与标准库线程池的对比(Comparison with Standard Library Thread Pool)
▮▮▮▮▮▮▮ 6.2 与其他开源线程池的对比(Comparison with Other Open Source Thread Pools)
▮▮▮▮▮▮▮ 6.3 CPUThreadPoolExecutor
的优势与劣势(Advantages and Disadvantages of CPUThreadPoolExecutor
)
▮▮▮▮▮▮▮ 6.4 适用场景分析与选择建议(Applicable Scenarios Analysis and Selection Recommendations)
▮▮▮▮ 7. chapter 7: 总结与展望
▮▮▮▮▮▮▮ 7.1 CPUThreadPoolExecutor
的核心价值总结(Summary of Core Value of CPUThreadPoolExecutor
)
▮▮▮▮▮▮▮ 7.2 未来发展趋势与展望(Future Development Trends and Prospects)
▮▮▮▮▮▮▮ 7.3 最佳实践与建议(Best Practices and Recommendations)
1. chapter 1: 线程池(Thread Pool)与并发编程(Concurrent Programming)基础
1.1 并发(Concurrency)与并行(Parallelism)概念
在深入探讨线程池之前,我们首先需要厘清并发(Concurrency)与并行(Parallelism)这两个经常被提及但又容易混淆的概念。理解它们之间的区别是掌握线程池技术乃至整个并发编程的基础。
并发(Concurrency) 描述的是程序结构的设计方式,它允许程序在逻辑上“同时”处理多个任务。在并发系统中,多个任务可以在同一时间段内被启动、运行和完成,但它们并不一定在同一时刻真正执行。更准确地说,并发关注的是如何管理多个任务,使得用户感觉这些任务在同时进行。
一个常见的比喻是单核CPU上的多任务处理。操作系统通过时间片轮转调度算法,快速地在多个任务之间切换执行,使得用户感觉多个程序在同时运行。实际上,在任何一个时间点,CPU 仍然只执行一个任务,只不过切换速度非常快,造成了“同时”执行的假象。
并行(Parallelism) 则描述的是程序执行的方式,它指的是程序中的多个任务在同一时刻被物理上同时执行。并行通常需要多核处理器或分布式系统的支持,每个核心或节点可以独立地执行不同的任务。
继续使用之前的比喻,并行就像是多核CPU上的多任务处理。每个 CPU 核心可以独立执行一个任务,因此多个任务可以真正地同时进行。例如,在一个四核处理器上,理论上可以同时并行执行四个独立的线程或进程。
为了更形象地理解,我们可以用排队买咖啡的例子来区分并发与并行:
⚝ 并发:只有一个咖啡师(单核 CPU),顾客(任务)排成一队。咖啡师轮流为顾客制作咖啡,当咖啡师在制作一杯咖啡时,其他顾客需要等待。虽然顾客感觉好像都在“同时”等待咖啡,但实际上咖啡师是交替处理每个顾客的需求。
⚝ 并行:有多个咖啡师(多核 CPU),顾客可以分成多队,每个咖啡师同时为一个或多个顾客制作咖啡。这样,多位顾客可以同时拿到咖啡,整体效率更高。
特征 | 并发(Concurrency) | 并行(Parallelism) |
---|---|---|
目的 | 管理多个任务,提高程序响应性 | 加速程序执行,提高吞吐量 |
执行方式 | 任务在时间上重叠,逻辑上同时发生,但可能交替执行 | 任务在物理上同时执行 |
硬件需求 | 单核或多核 CPU 均可 | 通常需要多核 CPU 或分布式系统支持 |
关注点 | 程序结构设计,任务调度,资源共享与同步 | 程序执行效率,任务分解,并行算法设计 |
关系 | 并发是并行的必要条件,并行是并发的理想执行状态 | 并行是并发的一种实现方式,并发可以通过并行来加速执行 |
示例 | 单核 CPU 上运行多线程程序,异步 I/O,事件驱动编程 | 多核 CPU 上运行多线程程序,分布式计算,GPU 加速计算 |
总结来说,并发是程序设计的一种思想,而并行是程序执行的一种状态。并发程序可以运行在单核或多核系统上,而并行程序则需要多核系统才能发挥其优势。在实际应用中,我们常常需要同时考虑并发和并行,利用并发的设计思想来组织程序结构,并尽可能地利用并行来提高程序的执行效率。
1.2 为什么需要线程池(Why Thread Pool)?
在传统的并发编程中,我们通常通过创建线程来执行并发任务。然而,频繁地创建和销毁线程会带来显著的性能开销,尤其是在高并发、任务量大的场景下。线程的创建和销毁涉及到操作系统资源的分配和回收,是一个相对重量级的操作。
线程创建的开销 主要体现在以下几个方面:
⚝ 时间开销:创建线程需要向操作系统申请资源,包括内核数据结构、堆栈空间等,这个过程需要消耗一定的时间。在高并发场景下,如果每个任务都创建一个新线程,线程创建的延迟会累积起来,影响程序的响应速度。
⚝ 资源开销:每个线程都需要占用一定的内存空间(例如,线程栈),大量线程的创建会消耗大量的系统内存资源,甚至可能导致系统资源耗尽。
⚝ 上下文切换开销:当线程数量过多时,CPU 在线程之间频繁切换,上下文切换本身也会带来额外的开销,降低 CPU 的有效利用率。
为了解决上述问题,线程池(Thread Pool) 技术应运而生。线程池是一种池化技术,它预先创建一组线程,并将这些线程放入一个“池子”中。当有新的任务需要执行时,线程池会从池子中取出一个空闲线程来执行任务,任务执行完毕后,线程并不会立即销毁,而是返回到池子中等待执行下一个任务。
使用线程池可以有效地减少线程创建和销毁的开销,提高系统的性能和资源利用率。线程池的核心思想是线程复用,通过复用已创建的线程来执行新的任务,避免了频繁创建和销毁线程的开销。
线程池的主要优势 可以概括为以下几点:
① 降低资源消耗:通过线程复用,减少了线程创建和销毁的次数,降低了系统资源的消耗,尤其是在高并发场景下,效果更加明显。
② 提高响应速度:当有新任务到达时,可以直接从线程池中获取空闲线程来执行,无需等待线程创建,提高了系统的响应速度。
③ 提高线程的可管理性:线程池可以统一管理和监控线程,例如,可以限制线程池中线程的最大数量,防止资源耗尽;可以设置线程的优先级、名称等属性,方便管理和调试。
④ 提供更强大的功能:许多线程池实现还提供了诸如任务队列、拒绝策略、线程池监控等高级功能,可以更好地满足复杂并发场景的需求。
总结,线程池的出现是为了解决传统线程使用方式在高并发场景下的性能瓶颈问题。通过线程复用和资源池化,线程池能够有效地降低线程管理的开销,提高系统的性能、响应速度和可管理性,是现代并发编程中不可或缺的重要组件。在需要处理大量并发任务的场景下,例如 Web 服务器、数据库连接池、消息队列等,线程池都得到了广泛的应用。
1.3 线程池的优势与应用场景(Advantages and Application Scenarios of Thread Pool)
上一节我们已经介绍了线程池的基本概念和为什么需要线程池。本节将进一步总结线程池的优势,并探讨线程池的常见应用场景。
线程池的优势 可以归纳为以下几个方面(在 1.2 节的基础上进一步展开):
① 性能提升 🚀:
▮▮▮▮ⓑ 减少线程生命周期开销:线程的创建和销毁是昂贵的操作。线程池通过复用已创建的线程,避免了频繁的线程创建和销毁,显著减少了这部分开销。
▮▮▮▮ⓒ 降低上下文切换开销:虽然线程池本身并不能直接减少上下文切换,但通过控制线程池的大小,可以避免创建过多的线程,从而间接地降低了由于过多线程竞争 CPU 资源而导致的上下文切换开销。
④ 资源管理 🧰:
▮▮▮▮ⓔ 资源复用:线程池中的线程可以被多个任务复用,提高了线程资源的利用率。
▮▮▮▮ⓕ 资源限制:线程池可以限制线程的最大数量,防止无限制地创建线程导致系统资源耗尽,例如内存溢出、CPU 负载过高等问题。这对于构建稳定可靠的系统至关重要。
⑦ 提高响应性 ⚡️:
▮▮▮▮ⓗ 快速任务执行:当有新任务到达时,线程池可以立即从池中获取空闲线程来执行,无需等待线程创建,缩短了任务的等待时间,提高了系统的响应速度。
▮▮▮▮ⓘ 平滑性能波动:在高负载时,线程池可以控制并发线程的数量,避免系统因瞬间过载而崩溃,使系统性能更加平稳。
⑩ 功能增强 ✨:
▮▮▮▮ⓚ 任务队列:线程池通常内置任务队列,用于缓存待执行的任务。任务队列可以实现任务的排队、优先级调度等功能,使得任务管理更加灵活和可控。
▮▮▮▮ⓛ 拒绝策略:当任务队列已满且线程池中的线程都在忙碌时,线程池需要采取拒绝策略来处理新提交的任务。常见的拒绝策略包括抛出异常、丢弃任务、阻塞提交等,可以根据不同的应用场景选择合适的策略。
▮▮▮▮ⓜ 监控与调优:线程池通常提供监控接口,可以获取线程池的运行状态,例如活跃线程数、任务队列长度、已完成任务数等指标。这些指标可以帮助我们了解线程池的性能瓶颈,并进行相应的调优。
线程池的应用场景 非常广泛,几乎所有需要并发处理任务的场景都可以考虑使用线程池。以下是一些典型的应用场景:
① Web 服务器 🌐:Web 服务器需要处理大量的客户端请求,每个请求通常需要在一个独立的线程中处理。使用线程池可以有效地管理处理请求的线程,提高服务器的并发处理能力和响应速度。例如,Tomcat、Nginx 等 Web 服务器都使用了线程池技术。
② 数据库连接池 🗄️:数据库连接的创建和销毁也是比较昂贵的操作。数据库连接池预先创建一定数量的数据库连接,当应用程序需要访问数据库时,从连接池中获取连接,使用完毕后将连接返回连接池,实现数据库连接的复用,提高数据库访问性能。
③ 消息队列 ✉️:消息队列系统需要处理大量的消息生产和消费请求。使用线程池可以并发地处理消息的生产和消费,提高消息队列系统的吞吐量和实时性。例如,Kafka、RabbitMQ 等消息队列系统内部也使用了线程池。
④ 异步任务处理 ⏱️:对于一些耗时的异步任务,例如文件上传、视频转码、发送邮件等,可以使用线程池来异步执行这些任务,避免阻塞主线程,提高用户体验。
⑤ 并行计算 🧮:对于一些计算密集型任务,例如图像处理、科学计算、大数据分析等,可以使用线程池将任务分解成多个子任务并行执行,充分利用多核 CPU 的计算能力,加速任务完成。
⑥ 缓存系统 📦:缓存系统需要快速响应大量的读写请求。使用线程池可以并发地处理缓存的读写操作,提高缓存系统的并发访问能力。例如,Redis、Memcached 等缓存系统也使用了线程池。
总而言之,线程池是一种通用的并发编程工具,适用于各种需要并发处理任务的场景。合理地使用线程池可以显著提高系统的性能、资源利用率和可维护性。在实际开发中,我们需要根据具体的应用场景和需求,选择合适的线程池实现和配置参数,才能充分发挥线程池的优势。
1.4 常见的线程池实现方案(Common Thread Pool Implementations)
线程池作为一种重要的并发编程模式,在各种编程语言和框架中都有广泛的实现。本节将介绍一些常见的线程池实现方案,包括标准库线程池、folly
库以及 folly::CPUThreadPoolExecutor
的定位和特点。
1.4.1 标准库线程池(Standard Library Thread Pool)简介
许多编程语言的标准库都提供了线程池的实现,方便开发者直接使用。这些标准库线程池通常具有良好的通用性和易用性,能够满足大多数常见的并发任务处理需求。
以 Java 为例,java.util.concurrent
包提供了丰富的线程池实现,例如:
⚝ ThreadPoolExecutor
:是 Java 线程池的核心类,提供了最灵活的配置选项,可以根据不同的需求创建各种类型的线程池。
⚝ FixedThreadPool
:固定大小的线程池,池中线程数量固定不变,适用于任务量稳定且需要快速响应的场景。
⚝ CachedThreadPool
:可缓存的线程池,线程数量可以动态伸缩,适用于任务量波动较大且任务执行时间较短的场景。
⚝ ScheduledThreadPool
:定时任务线程池,可以执行定时任务和周期性任务。
⚝ SingleThreadExecutor
:单线程线程池,池中只有一个线程,保证所有任务按照提交顺序依次执行。
C++11 标准库也引入了 <thread>
和 <future>
等头文件,提供了一些基本的并发编程工具,但并没有直接提供线程池的实现。在 C++ 中,开发者通常需要手动实现线程池,或者使用第三方库,例如 Boost.Asio、Qt Concurrent、以及我们本书重点介绍的 folly
库。
Python 的 concurrent.futures
模块提供了 ThreadPoolExecutor
和 ProcessPoolExecutor
,分别用于创建线程池和进程池,方便进行并发和并行编程。
Go 语言通过 goroutine 和 channel 提供了轻量级的并发机制,goroutine 可以看作是轻量级的线程,Go 语言的调度器可以高效地管理大量的 goroutine,因此在 Go 语言中,通常不需要显式地使用线程池,goroutine 本身就提供了类似线程池的功能。
标准库线程池的优点是易用性和通用性,可以直接在标准库中使用,无需引入额外的依赖。然而,标准库线程池的灵活性和性能可能相对有限,对于一些高性能、高并发的场景,可能需要更专业的线程池实现。
1.4.2 folly
库概述(Overview of folly
Library)及其在并发编程中的作用
folly
(Facebook Open Source Library) 是 Facebook 开源的一个 C++ 库,它包含了大量的通用组件,旨在提供高性能、高可靠性的 C++ 基础库。folly
库在 Facebook 内部被广泛使用,经过了大规模、高负载的生产环境的验证,具有很高的成熟度和稳定性。
folly
库的设计目标是补充 C++ 标准库的不足,提供更强大、更高效的工具和组件。folly
库涵盖了广泛的领域,包括:
⚝ 并发与异步编程:folly::Future
, folly::Promise
, folly::Executor
, folly::ThreadPoolExecutor
等,提供了丰富的异步编程工具和线程池实现。
⚝ 容器与数据结构:folly::Vector
, folly::FBString
, folly::HashMap
等,提供了高性能的容器和数据结构。
⚝ 网络编程:folly::Socket
, folly::IOBuf
, folly::EventBase
等,提供了高效的网络编程库。
⚝ 时间与日期:folly::Clock
, folly::DateTime
等,提供了高精度的时间和日期处理库。
⚝ 字符串处理:folly::StringPiece
, folly::Ascii
, folly::Format
等,提供了高效的字符串处理工具。
⚝ 配置与选项:folly::Options
, folly::CommandLineFlags
等,提供了灵活的配置管理工具。
在并发编程方面,folly
库提供了非常强大的支持。folly::Future
和 folly::Promise
是 folly
异步编程的核心组件,它们类似于 C++11 的 std::future
和 std::promise
,但功能更加强大,性能更高。folly::Executor
是一个抽象的执行器接口,用于执行任务,folly::ThreadPoolExecutor
则是 folly
库提供的线程池实现。
folly
库在并发编程中的作用主要体现在以下几个方面:
① 高性能异步编程:folly::Future
和 folly::Promise
提供了强大的异步编程模型,可以方便地构建高性能的异步应用。
② 灵活的执行器框架:folly::Executor
接口和各种执行器实现(包括 folly::ThreadPoolExecutor
)提供了灵活的任务执行框架,可以根据不同的需求选择合适的执行器。
③ 丰富的并发工具:folly
库还提供了许多其他的并发工具,例如 folly::Baton
, folly::Latch
, folly::Semaphore
等,可以帮助开发者更方便地进行并发编程。
④ 与 Facebook 内部基础设施的集成:folly
库与 Facebook 内部的基础设施紧密集成,例如 Facebook 的异步 RPC 框架 Thrift 就使用了 folly::Future
和 folly::Executor
。
总而言之,folly
库是一个非常强大和全面的 C++ 库,尤其在并发编程方面,提供了许多高性能、高可靠性的工具和组件。对于需要构建高性能 C++ 并发应用的开发者来说,folly
库是一个非常值得学习和使用的库。
1.4.3 folly::CPUThreadPoolExecutor
的定位与特点(Positioning and Features of folly::CPUThreadPoolExecutor
)
folly::CPUThreadPoolExecutor
是 folly
库提供的专门用于 CPU 密集型任务的线程池实现。与通用的线程池相比,CPUThreadPoolExecutor
在设计上更加注重CPU 资源的有效利用和低延迟。
CPUThreadPoolExecutor
的定位 可以概括为以下几点:
⚝ CPU 密集型任务优化:CPUThreadPoolExecutor
针对 CPU 密集型任务进行了优化,例如计算密集型任务、数据处理任务等。对于 I/O 密集型任务,可能不是最佳选择,folly
库还提供了 IOThreadPoolExecutor
用于处理 I/O 密集型任务。
⚝ 低延迟、高吞吐量:CPUThreadPoolExecutor
旨在提供低延迟和高吞吐量的任务执行能力,适用于对性能要求较高的场景。
⚝ 灵活的配置选项:CPUThreadPoolExecutor
提供了丰富的配置选项,可以根据不同的应用场景进行灵活的配置,例如线程池大小、任务队列类型、拒绝策略等。
⚝ 与 folly::Future
和 folly::Promise
的良好集成:CPUThreadPoolExecutor
与 folly
库的异步编程组件 folly::Future
和 folly::Promise
无缝集成,可以方便地构建异步并发应用。
CPUThreadPoolExecutor
的主要特点 包括:
① 基于 folly::Executor
接口:CPUThreadPoolExecutor
实现了 folly::Executor
接口,可以与其他 folly
库的组件(例如 folly::Future
)无缝集成。
② 可配置的线程池大小:可以根据 CPU 核心数和任务负载动态调整线程池的大小,以达到最佳的性能。通常建议线程池大小设置为 CPU 核心数或略多于 CPU 核心数,以充分利用 CPU 资源,又避免过多的上下文切换。
③ 多种任务队列类型:CPUThreadPoolExecutor
支持多种任务队列类型,例如 FIFO 队列、优先级队列等,可以根据不同的任务特性选择合适的队列类型。
④ 可定制的线程工厂:可以通过自定义线程工厂来创建线程池中的线程,例如设置线程名称、优先级、栈大小等属性。
⑤ 完善的监控指标:CPUThreadPoolExecutor
提供了丰富的监控指标,例如活跃线程数、任务队列长度、已完成任务数、任务执行时间等,可以方便地监控线程池的运行状态和性能。
⑥ 支持任务优先级:CPUThreadPoolExecutor
可以支持任务优先级,允许高优先级任务优先执行,保证关键任务的及时处理。
⑦ 异常处理机制:CPUThreadPoolExecutor
提供了异常处理机制,可以捕获任务执行过程中抛出的异常,并进行相应的处理,例如记录日志、重试任务等。
总结,folly::CPUThreadPoolExecutor
是 folly
库提供的专门用于 CPU 密集型任务的高性能线程池实现。它具有低延迟、高吞吐量、灵活配置、完善的监控指标等特点,适用于对性能要求较高的 CPU 密集型并发场景。在后续章节中,我们将深入探讨 CPUThreadPoolExecutor
的核心概念、使用方法、高级特性和应用场景,帮助读者全面掌握 CPUThreadPoolExecutor
的使用技巧,并将其应用到实际的并发编程项目中。
END_OF_CHAPTER
2. chapter 2: folly::CPUThreadPoolExecutor
核心概念与基础使用
2.1 CPUThreadPoolExecutor
的设计哲学(Design Philosophy of CPUThreadPoolExecutor
)
folly::CPUThreadPoolExecutor
并非横空出世,它的设计哲学深深根植于解决实际工程问题和对高效并发编程的深刻理解。理解其设计哲学,有助于我们更好地运用它,并在遇到问题时能够从容应对。
① 专注于 CPU 密集型任务(Focus on CPU-bound Tasks):顾名思义,CPUThreadPoolExecutor
主要设计用来处理 CPU 密集型任务(CPU-bound tasks)。这类任务的特点是需要大量的 CPU 计算,例如图像处理、数值计算、编解码等。线程池的大小通常会设置为接近 CPU 核心数,以最大化 CPU 的利用率,减少线程上下文切换的开销。这与 I/O 密集型任务(I/O-bound tasks)有所不同,后者通常需要更多的线程来等待 I/O 操作完成。
② 高性能与低延迟(High Performance and Low Latency):folly
库本身就以高性能著称,CPUThreadPoolExecutor
自然也继承了这一基因。它在设计上力求减少不必要的开销,例如锁竞争、上下文切换等,以提供尽可能高的吞吐量和低延迟。这对于构建高性能服务至关重要。
③ 灵活的可配置性(Flexible Configurability):CPUThreadPoolExecutor
提供了丰富的配置选项,允许用户根据具体的应用场景进行定制。例如,可以配置线程池的大小、任务队列的类型、线程工厂等。这种灵活性使得 CPUThreadPoolExecutor
能够适应各种不同的负载和性能需求。
④ 易用性与可维护性(Ease of Use and Maintainability):尽管功能强大,CPUThreadPoolExecutor
在 API 设计上力求简洁易用。核心接口清晰明了,方便开发者快速上手。同时,良好的代码结构和完善的文档也保证了其可维护性,降低了长期维护的成本。
⑤ 与 folly
库的深度整合(Deep Integration with folly
Library):CPUThreadPoolExecutor
与 folly
库的其他组件,如 Future
、Promise
、Function
等,有着良好的整合。这使得开发者可以方便地利用 folly
库提供的其他工具来构建更复杂、更强大的并发应用。例如,可以使用 folly::Future
来管理异步任务的结果,使用 folly::Function
来封装任务逻辑。
总而言之,folly::CPUThreadPoolExecutor
的设计哲学可以概括为:为 CPU 密集型任务提供高性能、低延迟、灵活可配置且易于使用的线程池解决方案,并与 folly
库生态系统深度整合。理解这些设计哲学,有助于我们更好地理解和使用 CPUThreadPoolExecutor
,并在实际应用中做出更明智的选择。
2.2 CPUThreadPoolExecutor
的基本架构(Basic Architecture of CPUThreadPoolExecutor
)
folly::CPUThreadPoolExecutor
的基本架构遵循经典的线程池模式,其核心目标是将任务的提交和执行解耦,从而提高并发处理能力和资源利用率。下图展示了 CPUThreadPoolExecutor
的基本架构:
\[ \begin{tikzcd} \text{客户端 (Client)} \arrow[r, "提交任务 (Submit Task)"] & \text{任务队列 (Task Queue)} \arrow[r] & \text{线程池 (Thread Pool)} \arrow[loop right, "执行任务 (Execute Task)"] \\ & & \text{工作线程 (Worker Threads)} \arrow[u, "从队列获取任务 (Get Task from Queue)"] \end{tikzcd} \]
从上图可以看出,CPUThreadPoolExecutor
的架构主要包含以下几个核心组件:
① 任务队列(Task Queue):任务队列用于存储待执行的任务。当客户端提交一个任务时,任务首先会被放入任务队列中。CPUThreadPoolExecutor
可以配置不同的任务队列类型,例如 FIFO 队列(先进先出队列)、优先级队列等,以满足不同的调度需求。任务队列通常需要是线程安全的,以支持多线程的并发访问。
② 线程池(Thread Pool):线程池是由一组工作线程组成的集合。这些工作线程会不断地从任务队列中取出任务并执行。线程池的大小(即工作线程的数量)是线程池性能的关键参数,需要根据具体的应用场景和硬件环境进行合理的配置。
③ 工作线程(Worker Threads):工作线程是线程池中真正执行任务的线程。每个工作线程都维护一个循环,不断地从任务队列中获取任务,执行任务,然后继续获取下一个任务。工作线程的生命周期通常由线程池管理,避免了频繁创建和销毁线程的开销。
④ 线程工厂(Thread Factory)(可选):线程工厂用于创建新的工作线程。CPUThreadPoolExecutor
允许用户自定义线程工厂,以便对工作线程的创建过程进行更精细的控制,例如设置线程的名称、优先级、栈大小等。
工作流程简述:
- 任务提交:客户端通过
CPUThreadPoolExecutor
提供的接口(例如add()
方法)提交任务。任务会被封装成特定的任务对象,并放入任务队列中。 - 任务调度:线程池中的工作线程会不断地从任务队列中获取任务。具体的获取方式取决于任务队列的类型和调度策略。
- 任务执行:工作线程获取到任务后,会执行任务的
run()
方法(或其他类似的方法)来完成任务的逻辑。 - 结果返回(可选):如果任务需要返回结果,可以通过
folly::Future
和folly::Promise
等机制将结果返回给客户端。 - 线程池管理:
CPUThreadPoolExecutor
负责管理线程池的生命周期,包括线程的创建、启动、停止、销毁等。同时,它还提供了一些监控和管理接口,例如获取线程池的状态、调整线程池的大小等。
理解 CPUThreadPoolExecutor
的基本架构,有助于我们更好地理解其工作原理,并在后续章节中深入学习其核心组件、配置选项和高级应用。
2.3 核心组件详解(Detailed Explanation of Core Components)
CPUThreadPoolExecutor
的高效性和灵活性很大程度上取决于其核心组件的设计和实现。本节将深入剖析 CPUThreadPoolExecutor
的三个核心组件:工作线程、任务队列和线程工厂。
2.3.1 工作线程(Worker Thread)
工作线程是线程池中最核心的执行单元。在 CPUThreadPoolExecutor
中,工作线程的主要职责是从任务队列中取出任务并执行。为了高效地完成这项工作,工作线程的设计需要考虑以下几个关键点:
① 线程生命周期管理:CPUThreadPoolExecutor
负责管理工作线程的生命周期。通常情况下,工作线程在线程池启动时被创建,并在线程池停止时被销毁。这种线程复用机制避免了频繁创建和销毁线程的开销,提高了性能。
② 任务获取循环:每个工作线程都运行在一个循环中,不断地执行以下操作:
▮▮▮▮⚝ 从任务队列中获取任务:工作线程会尝试从任务队列中获取待执行的任务。如果任务队列为空,线程通常会被阻塞或进入等待状态,直到有新的任务到来。
▮▮▮▮⚝ 执行任务:获取到任务后,工作线程会执行任务的 run()
方法(或其他类似的方法)。任务的具体逻辑由用户代码定义。
▮▮▮▮⚝ 循环往复:任务执行完成后,工作线程会回到循环的开始,继续尝试从任务队列中获取下一个任务。
③ 线程本地存储(Thread-Local Storage, TLS):在某些场景下,工作线程可能需要维护一些线程私有的数据。CPUThreadPoolExecutor
允许使用线程本地存储来为每个工作线程关联一份独立的数据副本。这可以避免多线程并发访问共享数据时可能出现的竞争和同步问题。
④ 异常处理:工作线程在执行任务的过程中可能会遇到异常。CPUThreadPoolExecutor
需要提供机制来处理这些异常,例如捕获异常、记录日志、通知客户端等,以保证线程池的稳定性和可靠性。
⑤ 线程中断与停止:当需要停止线程池时,CPUThreadPoolExecutor
需要能够安全地中断或停止工作线程的执行。这通常涉及到设置线程的中断标志、发送中断信号等机制,并确保工作线程能够优雅地退出。
代码示例(伪代码,仅用于说明工作线程的执行逻辑):
1
void workerThreadFunction(TaskQueue& taskQueue) {
2
while (true) {
3
Task task = taskQueue.takeTask(); // 从任务队列中获取任务,可能会阻塞
4
if (task.isNull()) { // 检查是否收到停止信号
5
break; // 退出循环,线程结束
6
}
7
try {
8
task.run(); // 执行任务
9
} catch (const std::exception& e) {
10
// 异常处理逻辑,例如记录日志
11
std::cerr << "Task execution failed: " << e.what() << std::endl;
12
}
13
}
14
}
上述伪代码展示了一个简单的工作线程的执行逻辑。实际的 CPUThreadPoolExecutor
实现会更加复杂,需要考虑更多的细节,例如线程同步、条件变量、任务调度策略等。
2.3.2 任务队列(Task Queue)
任务队列是 CPUThreadPoolExecutor
中用于存储待执行任务的缓冲区。任务队列的设计直接影响着线程池的性能和行为。一个好的任务队列需要具备以下特点:
① 线程安全性(Thread Safety):任务队列需要支持多线程的并发访问,即允许多个线程同时向队列中添加任务,并允许多个工作线程同时从队列中获取任务。为了保证线程安全,通常需要使用互斥锁、条件变量、原子操作等同步机制。
② 高效的并发性能(Efficient Concurrency Performance):在高并发场景下,任务队列需要能够快速地处理大量的任务提交和获取操作,避免成为性能瓶颈。这通常需要选择合适的数据结构和同步算法,例如使用无锁队列、CAS(Compare-and-Swap)操作等。
③ 灵活的队列类型选择(Flexible Queue Type Selection):CPUThreadPoolExecutor
通常支持多种任务队列类型,以满足不同的应用场景需求。常见的队列类型包括:
▮▮▮▮⚝ FIFO 队列(FIFO Queue,First-In-First-Out):先进先出队列,任务按照提交的顺序被执行。这是最常见的队列类型,适用于大多数场景。
▮▮▮▮⚝ 优先级队列(Priority Queue):优先级队列允许为任务设置优先级,优先级高的任务会被优先执行。适用于需要区分任务重要程度的场景。
▮▮▮▮⚝ 有界队列(Bounded Queue):有界队列限制了队列的最大容量。当队列已满时,新的任务提交操作可能会被阻塞或拒绝。适用于需要控制任务积压的场景。
▮▮▮▮⚝ 无界队列(Unbounded Queue):无界队列没有容量限制,可以容纳任意数量的任务。但需要注意,如果任务提交速度过快,可能会导致内存溢出。
④ 公平性与调度策略(Fairness and Scheduling Policy):任务队列的调度策略决定了任务被工作线程获取的顺序。例如,FIFO 队列保证了任务的公平性,先提交的任务先被执行。优先级队列则根据任务的优先级进行调度。CPUThreadPoolExecutor
可能会提供配置选项来选择不同的调度策略。
⑤ 阻塞与非阻塞操作(Blocking and Non-blocking Operations):任务队列通常提供阻塞和非阻塞两种操作方式。例如,takeTask()
方法可能是阻塞的,当队列为空时,调用线程会被阻塞直到有新的任务到来;而 tryTakeTask()
方法可能是非阻塞的,当队列为空时,立即返回空值。
常见的任务队列实现:
⚝ std::queue
+ std::mutex
+ std::condition_variable
:使用标准库的队列、互斥锁和条件变量来实现线程安全的 FIFO 队列。实现简单,但性能可能受限于锁竞争。
⚝ folly::ConcurrentQueue
:folly
库提供的并发队列,基于无锁技术实现,具有更高的并发性能。
⚝ boost::lockfree::queue
:Boost
库提供的无锁队列,也具有较高的并发性能。
CPUThreadPoolExecutor
通常会允许用户配置或自定义任务队列的类型,以便根据具体的应用场景选择最合适的队列实现。
2.3.3 线程工厂(Thread Factory)
线程工厂是一个用于创建新线程的组件。在 CPUThreadPoolExecutor
中,线程工厂负责创建工作线程。虽然线程的创建看似简单,但在某些场景下,对线程的创建过程进行更精细的控制是非常有用的。线程工厂的主要作用包括:
① 自定义线程属性:线程工厂允许用户自定义创建线程的属性,例如:
▮▮▮▮⚝ 线程名称:为线程设置有意义的名称,方便在调试和监控时识别线程。
▮▮▮▮⚝ 线程优先级:设置线程的优先级,影响线程的调度顺序。
▮▮▮▮⚝ 线程栈大小:设置线程的栈大小,影响线程可以使用的栈空间。
▮▮▮▮⚝ 守护线程(Daemon Thread):设置线程是否为守护线程。
② 线程组管理:线程工厂可以将创建的线程放入特定的线程组中进行管理。线程组可以用于批量管理线程,例如统一设置线程的优先级、统一中断线程等。
③ 线程创建的钩子函数:线程工厂可以提供钩子函数,允许用户在线程创建前后执行一些自定义的操作。例如,在线程创建前初始化线程本地存储,在线程创建后记录线程创建日志等。
④ 解耦线程创建逻辑:使用线程工厂可以将线程的创建逻辑从 CPUThreadPoolExecutor
的核心代码中解耦出来,提高了代码的可维护性和可扩展性。用户可以根据需要自定义线程工厂,而无需修改 CPUThreadPoolExecutor
的核心代码。
folly
提供的线程工厂:
folly
库提供了一些默认的线程工厂实现,例如 std::thread::hardware_concurrency()
线程工厂,它会根据硬件并发级别创建相应数量的线程。用户也可以自定义线程工厂,实现更精细的线程创建控制。
代码示例(自定义线程工厂):
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <folly/executors/NamedThreadFactory.h>
3
#include <iostream>
4
5
class MyThreadFactory : public folly::NamedThreadFactory {
6
public:
7
MyThreadFactory(const std::string& name) : folly::NamedThreadFactory(name) {}
8
9
std::thread* newThread(folly::Func<void()> func) override {
10
std::cout << "Creating new thread with name: " << name() << std::endl;
11
std::thread* thread = folly::NamedThreadFactory::newThread(func);
12
// 可以设置线程优先级、栈大小等
13
return thread;
14
}
15
};
16
17
int main() {
18
// 使用自定义线程工厂创建 CPUThreadPoolExecutor
19
folly::CPUThreadPoolExecutor executor(
20
4, std::make_unique<MyThreadFactory>("MyWorkerThread"));
21
22
// ... 提交任务 ...
23
24
return 0;
25
}
上述代码示例展示了如何自定义一个线程工厂 MyThreadFactory
,并在创建线程时输出日志信息。通过自定义线程工厂,我们可以灵活地控制线程的创建过程,满足各种定制化的需求。
2.4 CPUThreadPoolExecutor
的创建与初始化(Creation and Initialization of CPUThreadPoolExecutor
)
创建和初始化 folly::CPUThreadPoolExecutor
是使用线程池的第一步。folly::CPUThreadPoolExecutor
提供了多种构造函数,允许用户根据不同的需求进行灵活的配置。
基本构造函数:
folly::CPUThreadPoolExecutor
最常用的构造函数如下:
1
folly::CPUThreadPoolExecutor(int numThreads, std::unique_ptr<ThreadFactory> threadFactory = std::make_unique<NamedThreadFactory>("CPUThreadPoolExecutor"));
⚝ numThreads
:指定线程池中工作线程的数量。线程数量的选择需要根据具体的应用场景和硬件环境进行权衡。对于 CPU 密集型任务,通常建议将线程数量设置为接近 CPU 核心数,例如 std::thread::hardware_concurrency()
。
⚝ threadFactory
(可选):指定线程工厂。用于创建工作线程。如果不指定,默认使用 folly::NamedThreadFactory
,它会为线程设置一个默认的名称 "CPUThreadPoolExecutor"。用户可以自定义线程工厂,例如使用 folly::NamedThreadFactory
设置自定义的线程名称,或者使用完全自定义的线程工厂实现更精细的线程创建控制。
代码示例:创建 CPUThreadPoolExecutor
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <folly/executors/NamedThreadFactory.h>
3
#include <iostream>
4
5
int main() {
6
// 创建一个包含 4 个工作线程的 CPUThreadPoolExecutor,使用默认线程工厂
7
folly::CPUThreadPoolExecutor executor1(4);
8
9
// 创建一个包含 8 个工作线程的 CPUThreadPoolExecutor,并自定义线程名称
10
folly::CPUThreadPoolExecutor executor2(
11
8, std::make_unique<folly::NamedThreadFactory>("MyThreadPool"));
12
13
// 创建一个包含硬件并发数个工作线程的 CPUThreadPoolExecutor
14
int numThreads = std::thread::hardware_concurrency();
15
folly::CPUThreadPoolExecutor executor3(numThreads);
16
17
std::cout << "ThreadPools created successfully!" << std::endl;
18
19
return 0;
20
}
初始化注意事项:
⚝ 线程数量的选择:线程数量是影响线程池性能的关键参数。过少的线程数量可能无法充分利用 CPU 资源,导致任务处理速度慢;过多的线程数量可能会增加线程上下文切换的开销,甚至导致性能下降。合理的线程数量需要根据具体的应用场景和硬件环境进行测试和调优。
⚝ 线程工厂的选择:默认的 folly::NamedThreadFactory
适用于大多数场景。如果需要自定义线程属性或线程创建逻辑,可以考虑自定义线程工厂。
⚝ 线程池的生命周期管理:CPUThreadPoolExecutor
对象通常需要在程序结束前被销毁,以释放资源。可以使用智能指针(例如 std::unique_ptr
)来管理 CPUThreadPoolExecutor
对象的生命周期,确保在不再使用时自动释放资源。
2.5 任务提交与执行(Task Submission and Execution)
创建并初始化 CPUThreadPoolExecutor
后,就可以向线程池提交任务并执行了。CPUThreadPoolExecutor
提供了 add()
方法来提交任务,并使用 start()
和 stop()
方法来控制线程池的启动和停止。
2.5.1 add()
方法:提交任务(Submit Task with add()
)
add()
方法用于向 CPUThreadPoolExecutor
提交一个任务。任务通常以 folly::Func
对象的形式传递给 add()
方法。folly::Func
是 folly
库提供的一个通用的函数对象封装器,可以封装各种可调用对象,例如普通函数、Lambda 表达式、成员函数等。
add()
方法的签名:
1
template <typename Func, typename... Args>
2
auto add(Func&& func, Args&&... args) -> folly::Future<folly::Unit>;
⚝ Func&& func
:要执行的任务函数对象,可以是任何可调用对象。
⚝ Args&&... args
:传递给任务函数对象的参数列表。
⚝ 返回值 folly::Future<folly::Unit>
:返回一个 folly::Future
对象,代表任务的执行结果。folly::Unit
表示任务没有返回值(void 返回类型)。如果任务有返回值,可以使用其他类型的 folly::Future
,例如 folly::Future<int>
、folly::Future<std::string>
等。
代码示例:使用 add()
方法提交任务
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <iostream>
3
4
void myTask(int taskId) {
5
std::cout << "Task " << taskId << " is running in thread: " << std::this_thread::get_id() << std::endl;
6
// 模拟任务执行时间
7
std::this_thread::sleep_for(std::chrono::milliseconds(100));
8
std::cout << "Task " << taskId << " finished." << std::endl;
9
}
10
11
int main() {
12
folly::CPUThreadPoolExecutor executor(4);
13
14
// 提交 5 个任务
15
for (int i = 1; i <= 5; ++i) {
16
executor.add(myTask, i); // 提交任务 myTask,并传递参数 i
17
}
18
19
// ... 后续操作 ...
20
21
return 0;
22
}
在上述示例中,我们定义了一个简单的任务函数 myTask
,然后使用 executor.add(myTask, i)
提交了 5 个任务。add()
方法会将任务封装成 folly::Func
对象,并放入任务队列中,等待工作线程执行。
folly::Future
的作用:
add()
方法返回的 folly::Future<folly::Unit>
对象可以用于:
⚝ 同步等待任务完成:可以使用 future.wait()
方法阻塞当前线程,直到任务执行完成。
⚝ 获取任务执行结果(如果任务有返回值):可以使用 future.get()
方法获取任务的返回值。
⚝ 链式调用:可以使用 future.then()
、future.error()
等方法进行链式调用,实现更复杂的异步操作。
在基础使用场景中,我们通常不需要显式地处理 folly::Future
对象,只需要提交任务即可。在后续章节中,我们将深入探讨 folly::Future
的高级用法。
2.5.2 start()
方法:启动线程池(Start Thread Pool with start()
)
CPUThreadPoolExecutor
默认情况下在创建后并不会立即启动工作线程。需要显式地调用 start()
方法来启动线程池,开始接受和执行任务。
start()
方法的签名:
1
void start();
start()
方法没有参数,也没有返回值。它会启动线程池中的所有工作线程,使其开始从任务队列中获取任务并执行。
代码示例:启动线程池
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <iostream>
3
4
void myTask(int taskId) {
5
std::cout << "Task " << taskId << " is running in thread: " << std::this_thread::get_id() << std::endl;
6
std::this_thread::sleep_for(std::chrono::milliseconds(100));
7
std::cout << "Task " << taskId << " finished." << std::endl;
8
}
9
10
int main() {
11
folly::CPUThreadPoolExecutor executor(4);
12
13
// 提交任务(在 start() 之前提交任务是可以的,任务会先放入队列)
14
for (int i = 1; i <= 2; ++i) {
15
executor.add(myTask, i);
16
}
17
18
// 启动线程池
19
executor.start();
20
21
// 再次提交任务(在 start() 之后提交任务也是可以的)
22
for (int i = 3; i <= 4; ++i) {
23
executor.add(myTask, i);
24
}
25
26
// 等待一段时间,确保任务执行完成
27
std::this_thread::sleep_for(std::chrono::seconds(1));
28
29
return 0;
30
}
在上述示例中,我们在提交任务之后,调用了 executor.start()
方法来启动线程池。需要注意的是,即使在 start()
方法之前提交了任务,这些任务也会被正确地放入任务队列中,并在线程池启动后被执行。
start()
方法的调用时机:
⚝ 通常情况下,建议在提交完所有初始任务后再调用 start()
方法。
⚝ 可以多次调用 start()
方法,但只有第一次调用会真正启动线程池,后续调用会被忽略。
2.5.3 stop()
方法:停止线程池(Stop Thread Pool with stop()
)
当不再需要线程池时,或者程序即将结束时,需要调用 stop()
方法来停止线程池,释放资源。stop()
方法会平滑地停止线程池,等待正在执行的任务完成后再退出,并阻止新的任务提交。
stop()
方法的签名:
1
void stop();
stop()
方法没有参数,也没有返回值。它会执行以下操作:
- 设置线程池状态为停止:阻止新的任务提交。
- 向工作线程发送停止信号:通知工作线程停止从任务队列中获取新的任务。
- 等待所有正在执行的任务完成:
stop()
方法会阻塞当前线程,直到所有正在执行的任务都完成。 - 销毁工作线程:任务完成后,销毁线程池中的所有工作线程。
代码示例:停止线程池
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <iostream>
3
4
void myTask(int taskId) {
5
std::cout << "Task " << taskId << " is running in thread: " << std::this_thread::get_id() << std::endl;
6
std::this_thread::sleep_for(std::chrono::milliseconds(100));
7
std::cout << "Task " << taskId << " finished." << std::endl;
8
}
9
10
int main() {
11
folly::CPUThreadPoolExecutor executor(4);
12
executor.start(); // 启动线程池
13
14
for (int i = 1; i <= 5; ++i) {
15
executor.add(myTask, i);
16
}
17
18
std::cout << "Stopping thread pool..." << std::endl;
19
executor.stop(); // 停止线程池,等待任务完成
20
21
std::cout << "Thread pool stopped." << std::endl;
22
23
return 0;
24
}
在上述示例中,我们在提交任务后,调用了 executor.stop()
方法来停止线程池。stop()
方法会等待所有已提交的任务执行完成后再返回,确保程序安全退出。
stop()
方法的调用时机:
⚝ 通常在程序即将结束时,或者不再需要线程池时调用 stop()
方法。
⚝ 确保在 stop()
方法调用之后,不再向线程池提交新的任务。
⚝ 可以多次调用 stop()
方法,但只有第一次调用会真正执行停止操作,后续调用会被忽略。
shutdown()
方法:
除了 stop()
方法,CPUThreadPoolExecutor
还提供了 shutdown()
方法,其功能与 stop()
方法类似,也是用于停止线程池。在大多数情况下,stop()
和 shutdown()
方法可以互换使用。
2.6 基础代码示例:快速上手 CPUThreadPoolExecutor
(Basic Code Example: Getting Started with CPUThreadPoolExecutor
)
为了帮助读者快速上手 folly::CPUThreadPoolExecutor
,本节提供一个完整的、可编译运行的基础代码示例,演示了 CPUThreadPoolExecutor
的创建、启动、任务提交和停止的基本用法。
代码示例:
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <folly/executors/NamedThreadFactory.h>
3
#include <iostream>
4
#include <chrono>
5
#include <thread>
6
7
void helloTask(int taskId) {
8
std::cout << "Hello from task " << taskId << ", thread id: " << std::this_thread::get_id() << std::endl;
9
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // 模拟任务执行时间
10
}
11
12
int main() {
13
// 1. 创建 CPUThreadPoolExecutor,指定线程数量和线程工厂
14
int numThreads = 4;
15
auto threadFactory = std::make_unique<folly::NamedThreadFactory>("MyWorkerThread");
16
folly::CPUThreadPoolExecutor executor(numThreads, std::move(threadFactory));
17
18
// 2. 启动线程池
19
executor.start();
20
std::cout << "Thread pool started with " << numThreads << " threads." << std::endl;
21
22
// 3. 提交多个任务
23
int numTasks = 10;
24
std::cout << "Submitting " << numTasks << " tasks..." << std::endl;
25
for (int i = 1; i <= numTasks; ++i) {
26
executor.add(helloTask, i);
27
}
28
29
// 4. 等待一段时间,确保任务执行完成 (实际应用中可以使用 Future 进行更精确的同步)
30
std::this_thread::sleep_for(std::chrono::seconds(2));
31
32
// 5. 停止线程池
33
std::cout << "Stopping thread pool..." << std::endl;
34
executor.stop();
35
std::cout << "Thread pool stopped." << std::endl;
36
37
std::cout << "Example finished." << std::endl;
38
return 0;
39
}
编译和运行:
- 确保已安装
folly
库:请参考folly
官方文档进行安装。 - 使用支持 C++17 或更高版本的编译器编译代码:例如
g++
或clang++
。 - 编译命令示例(假设代码文件名为
basic_example.cpp
):
1
g++ -std=c++17 basic_example.cpp -o basic_example -lfolly -lpthread
- 运行可执行文件:
1
./basic_example
代码说明:
⚝ 代码首先创建了一个 CPUThreadPoolExecutor
对象,指定了 4 个工作线程,并使用了自定义名称的线程工厂。
⚝ 然后,调用 start()
方法启动线程池。
⚝ 接着,循环提交了 10 个简单的 helloTask
任务。
⚝ 使用 std::this_thread::sleep_for()
简单地等待一段时间,确保任务执行完成。在实际应用中,可以使用 folly::Future
进行更精确的任务同步和结果获取。
⚝ 最后,调用 stop()
方法停止线程池。
运行上述代码,你将看到类似以下的输出,表明任务被线程池中的工作线程并发执行:
1
Thread pool started with 4 threads.
2
Submitting 10 tasks...
3
Hello from task 1, thread id: 140737317883648
4
Hello from task 2, thread id: 140737326279424
5
Hello from task 3, thread id: 140737334675200
6
Hello from task 4, thread id: 140737343070976
7
Hello from task 5, thread id: 140737317883648
8
Hello from task 6, thread id: 140737326279424
9
Hello from task 7, thread id: 140737334675200
10
Hello from task 8, thread id: 140737343070976
11
Hello from task 9, thread id: 140737317883648
12
Hello from task 10, thread id: 140737326279424
13
Stopping thread pool...
14
Thread pool stopped.
15
Example finished.
通过这个简单的示例,读者可以快速了解 folly::CPUThreadPoolExecutor
的基本使用流程,为后续深入学习和应用打下基础。
END_OF_CHAPTER
3. chapter 3: folly::CPUThreadPoolExecutor
进阶特性与配置
3.1 线程池大小动态调整(Dynamic Adjustment of Thread Pool Size)
线程池的核心优势之一在于能够有效地管理和复用线程,从而避免频繁创建和销毁线程的开销。然而,固定的线程池大小在面对动态变化的工作负载时可能显得不够灵活。当任务负载 अचानक 增加时,固定大小的线程池可能成为瓶颈,导致任务排队等待,响应延迟增加;而当负载降低时,过多的空闲线程又会浪费系统资源。因此,线程池大小动态调整(Dynamic Adjustment of Thread Pool Size) 成为一项重要的进阶特性,它允许线程池根据实际负载情况自动伸缩线程数量,以达到最佳的性能和资源利用率。
folly::CPUThreadPoolExecutor
本身不直接提供自动动态调整线程池大小的机制。其设计哲学更倾向于由开发者根据应用场景和性能需求,预先配置合适的线程池大小。这主要是因为线程池的动态调整涉及到复杂的策略制定,例如何时扩容、何时缩容、扩容和缩容的步长等,这些策略往往与具体的应用场景和性能指标紧密相关。将动态调整的策略交给开发者,可以提供更大的灵活性和控制力。
尽管 CPUThreadPoolExecutor
没有内置的自动动态调整功能,但开发者可以基于其提供的 API 和监控指标,手动实现动态调整线程池大小的功能。以下是一些实现动态调整的思路和方法:
① 监控线程池指标: CPUThreadPoolExecutor
提供了多种方法来获取线程池的运行状态,例如:
▮▮▮▮ⓑ 任务队列大小(Task Queue Size): 反映了当前待执行任务的积压程度。当任务队列持续增长并超过一定阈值时,可能意味着线程池处理能力不足,需要扩容。
▮▮▮▮ⓒ 活跃线程数(Active Thread Count): 表示当前正在执行任务的线程数量。如果活跃线程数长时间接近或达到线程池的最大线程数,但任务队列仍然很长,也可能需要扩容。
▮▮▮▮ⓓ 任务完成速率(Task Completion Rate): 监控单位时间内完成的任务数量。如果任务完成速率下降,可能意味着线程池遇到了瓶颈,需要调整。
② 制定动态调整策略: 基于监控指标,可以制定相应的动态调整策略。例如:
▮▮▮▮ⓑ 基于队列长度的策略: 当任务队列长度超过预设阈值时,增加线程池大小;当队列长度长时间低于某个阈值时,减小线程池大小。
▮▮▮▮ⓒ 基于 CPU 利用率的策略: 监控系统的 CPU 利用率。当 CPU 利用率较低且任务队列较长时,可以适当增加线程池大小,以提高 CPU 利用率和任务处理能力。
▮▮▮▮ⓓ 基于响应时间的策略: 监控任务的平均响应时间。当响应时间超过可接受范围时,增加线程池大小;当响应时间远低于要求时,可以考虑缩减线程池大小。
③ 手动调整线程池大小: 虽然 CPUThreadPoolExecutor
没有直接提供动态调整大小的 API,但可以通过重新创建线程池的方式来间接实现。具体步骤如下:
▮▮▮▮ⓑ 停止当前的线程池: 使用 stop()
方法安全地停止当前的 CPUThreadPoolExecutor
,确保所有正在执行的任务完成,并拒绝新的任务提交。
▮▮▮▮ⓒ 创建新的线程池: 根据动态调整策略,创建一个具有新的线程池大小的 CPUThreadPoolExecutor
实例。
▮▮▮▮ⓓ 切换到新的线程池: 将后续的任务提交到新的线程池中。
代码示例(伪代码):
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <folly/executors/task_queue/LinkedBlockingQueue.h>
3
#include <iostream>
4
#include <chrono>
5
#include <thread>
6
7
using namespace folly;
8
using namespace std;
9
10
int main() {
11
// 初始线程池大小
12
size_t initialPoolSize = 4;
13
// 最大线程池大小
14
size_t maxPoolSize = 16;
15
// 当前线程池大小
16
size_t currentPoolSize = initialPoolSize;
17
18
// 创建线程池
19
CPUThreadPoolExecutor executor(currentPoolSize, make_unique<LinkedBlockingQueue>());
20
executor.start();
21
22
// 模拟负载监控和动态调整
23
for (int i = 0; i < 10; ++i) {
24
// 模拟任务提交
25
for (int j = 0; j < 20; ++j) {
26
executor.add([j]() {
27
this_thread::sleep_for(chrono::milliseconds(100));
28
cout << "Task " << j << " executed by thread " << this_thread::get_id() << endl;
29
});
30
}
31
32
// 模拟监控任务队列大小 (实际应用中需要更精确的监控)
33
size_t queueSize = executor.getTaskQueueSize();
34
cout << "Current task queue size: " << queueSize << endl;
35
36
// 简单的动态调整策略:如果队列大小超过阈值,则增加线程池大小
37
size_t queueThreshold = 10;
38
if (queueSize > queueThreshold && currentPoolSize < maxPoolSize) {
39
currentPoolSize = min(currentPoolSize * 2, maxPoolSize); // 简单地将线程池大小翻倍,但不超过最大值
40
cout << "Resizing thread pool to: " << currentPoolSize << endl;
41
42
// 停止旧的线程池
43
executor.stop();
44
45
// 创建新的线程池
46
executor = CPUThreadPoolExecutor(currentPoolSize, make_unique<LinkedBlockingQueue>());
47
executor.start();
48
}
49
50
this_thread::sleep_for(chrono::seconds(1)); // 模拟监控间隔
51
}
52
53
executor.stop();
54
return 0;
55
}
总结:
⚝ folly::CPUThreadPoolExecutor
本身不提供自动动态调整线程池大小的功能。
⚝ 开发者可以基于其提供的监控指标和 API,手动实现动态调整策略。
⚝ 动态调整策略需要根据具体的应用场景和性能需求进行定制。
⚝ 手动调整线程池大小可以通过停止旧线程池并创建新线程池的方式实现。
⚝ 动态调整线程池大小是一项高级特性,需要谨慎设计和测试,以避免频繁的线程池重建带来的额外开销。
3.2 任务优先级(Task Priority)管理
在某些应用场景中,并非所有的任务都具有相同的紧急程度或重要性。任务优先级(Task Priority)管理 允许线程池根据任务的优先级来调度执行顺序,确保高优先级的任务能够更快地得到处理,从而提升系统的响应速度和整体性能。例如,在实时系统中,处理用户交互的任务可能需要比后台数据同步任务更高的优先级。
folly::CPUThreadPoolExecutor
本身不直接支持任务优先级管理。默认情况下,任务按照提交的顺序(FIFO - 先进先出)进入任务队列,并由线程池中的线程按照入队顺序执行。
然而,为了实现任务优先级,开发者可以自定义任务队列,并结合 CPUThreadPoolExecutor
使用。通过使用优先级队列(Priority Queue) 作为 CPUThreadPoolExecutor
的任务队列,可以实现基于优先级的任务调度。
实现思路:
① 自定义优先级任务类: 首先,需要定义一个包含优先级信息的任务类。这个类需要能够比较优先级,以便优先级队列能够正确排序。
1
#include <folly/Function.h>
2
3
struct PriorityTask {
4
int priority;
5
folly::Func<void()> task; // 使用 folly::Func 封装任务
6
7
PriorityTask(int p, folly::Func<void()> t) : priority(p), task(std::move(t)) {}
8
9
// 定义优先级比较,例如,数字越小优先级越高
10
bool operator>(const PriorityTask& other) const {
11
return priority > other.priority;
12
}
13
};
② 使用 std::priority_queue
作为任务队列: std::priority_queue
是 C++ 标准库提供的优先级队列容器。可以将其作为 CPUThreadPoolExecutor
的任务队列。需要注意的是,std::priority_queue
默认是最大堆,为了实现数字越小优先级越高,需要自定义比较函数或者重载 operator>
。
1
#include <queue>
2
#include <memory>
3
#include <folly/executors/task_queue/CustomTaskQueue.h>
4
5
// 自定义优先级任务队列
6
class PriorityBlockingQueue : public folly::CustomTaskQueue {
7
public:
8
PriorityBlockingQueue() = default;
9
10
void add(folly::Func<void()> func) override {
11
// 默认优先级为 0,可以根据实际情况设置
12
queue_.push(PriorityTask(0, std::move(func)));
13
}
14
15
void addWithPriority(int priority, folly::Func<void()> func) {
16
queue_.push(PriorityTask(priority, std::move(func)));
17
}
18
19
folly::Func<void()> take() override {
20
std::unique_lock<std::mutex> lock(mutex_);
21
cv_.wait(lock, [this] { return !queue_.empty() || isShutdown(); });
22
if (isShutdown() && queue_.empty()) {
23
return nullptr; // 线程池已停止且队列为空
24
}
25
PriorityTask task = queue_.top();
26
queue_.pop();
27
return task.task;
28
}
29
30
size_t size() const override {
31
std::lock_guard<std::mutex> lock(mutex_);
32
return queue_.size();
33
}
34
35
bool empty() const override {
36
std::lock_guard<std::mutex> lock(mutex_);
37
return queue_.empty();
38
}
39
40
private:
41
std::priority_queue<PriorityTask, std::vector<PriorityTask>, std::greater<PriorityTask>> queue_; // 使用 std::priority_queue
42
std::mutex mutex_;
43
std::condition_variable cv_;
44
};
③ 创建 CPUThreadPoolExecutor
时使用自定义优先级队列: 在创建 CPUThreadPoolExecutor
实例时,使用自定义的 PriorityBlockingQueue
作为任务队列。
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <iostream>
3
#include <thread>
4
5
using namespace folly;
6
using namespace std;
7
8
int main() {
9
// 创建使用优先级队列的线程池
10
CPUThreadPoolExecutor executor(4, make_unique<PriorityBlockingQueue>());
11
executor.start();
12
13
// 提交带优先级的任务
14
auto& priorityQueue = dynamic_cast<PriorityBlockingQueue&>(executor.getTaskQueue()); // 获取自定义队列的引用
15
16
priorityQueue.addWithPriority(2, []() {
17
cout << "Low priority task executed by thread " << this_thread::get_id() << endl;
18
this_thread::sleep_for(chrono::milliseconds(200));
19
});
20
21
priorityQueue.addWithPriority(0, []() {
22
cout << "High priority task executed by thread " << this_thread::get_id() << endl;
23
this_thread::sleep_for(chrono::milliseconds(100));
24
});
25
26
priorityQueue.addWithPriority(1, []() {
27
cout << "Medium priority task executed by thread " << this_thread::get_id() << endl;
28
this_thread::sleep_for(chrono::milliseconds(150));
29
});
30
31
// 等待任务完成 (实际应用中需要更完善的等待机制)
32
this_thread::sleep_for(chrono::seconds(2));
33
34
executor.stop();
35
return 0;
36
}
注意事项:
⚝ 优先级反转(Priority Inversion): 使用优先级队列时需要注意优先级反转问题。当低优先级任务持有高优先级任务需要的资源时,可能导致高优先级任务被阻塞,从而降低优先级调度的效果。需要采取相应的措施来避免优先级反转,例如优先级继承或优先级天花板协议。
⚝ 优先级饥饿(Priority Starvation): 如果持续有高优先级任务提交,可能会导致低优先级任务长时间得不到执行,发生优先级饥饿。需要合理设置优先级策略,避免过度偏向高优先级任务。
⚝ 性能开销: 优先级队列的插入和删除操作通常比普通队列略慢,会带来一定的性能开销。在对性能要求非常高的场景中,需要权衡优先级调度带来的收益和性能开销。
总结:
⚝ folly::CPUThreadPoolExecutor
默认不支持任务优先级管理。
⚝ 可以通过自定义任务队列,使用 std::priority_queue
实现优先级调度。
⚝ 需要自定义优先级任务类和优先级队列,并重写 add
和 take
方法。
⚝ 使用优先级调度时需要注意优先级反转和优先级饥饿问题,并权衡性能开销。
⚝ 任务优先级管理适用于需要区分任务重要性和紧急程度的应用场景。
3.3 自定义任务队列(Custom Task Queue)
folly::CPUThreadPoolExecutor
提供了高度的灵活性,允许开发者自定义任务队列(Custom Task Queue),以满足各种特殊的任务调度和管理需求。默认情况下,CPUThreadPoolExecutor
使用 LinkedBlockingQueue
作为任务队列,这是一种基于链表的无界阻塞队列,适用于大多数通用场景。然而,在某些特定场景下,可能需要使用不同类型的队列,例如:
⚝ 有界队列(Bounded Queue): 限制任务队列的最大长度,防止任务积压过多导致内存溢出。
⚝ 优先级队列(Priority Queue): 如上一节所述,用于实现基于优先级的任务调度。
⚝ 延迟队列(Delay Queue): 任务在提交后需要延迟一段时间才能被执行。
⚝ LIFO 队列(Stack-like Queue): 后进先出队列,适用于某些特定的任务处理模式。
⚝ 更高效的数据结构: 针对特定 workload,可能存在比 LinkedBlockingQueue
更高效的队列实现。
folly::CPUThreadPoolExecutor
通过 CustomTaskQueue
抽象基类,为自定义任务队列提供了接口。开发者只需要继承 CustomTaskQueue
类,并实现其抽象方法,就可以创建自定义的任务队列,并将其应用于 CPUThreadPoolExecutor
。
CustomTaskQueue
抽象基类:
folly::CustomTaskQueue
定义了任务队列需要实现的基本接口:
1
class CustomTaskQueue {
2
public:
3
virtual ~CustomTaskQueue() = default;
4
5
// 添加任务到队列
6
virtual void add(Func<void()> func) = 0;
7
8
// 从队列中获取任务,阻塞等待直到有任务可用或线程池停止
9
virtual Func<void()> take() = 0;
10
11
// 获取队列当前大小
12
virtual size_t size() const = 0;
13
14
// 判断队列是否为空
15
virtual bool empty() const = 0;
16
17
// 标记线程池已停止,用于唤醒等待在 take() 的线程
18
virtual void shutdown() noexcept;
19
20
// 判断线程池是否已停止
21
virtual bool isShutdown() const noexcept;
22
23
protected:
24
CustomTaskQueue() noexcept = default;
25
26
private:
27
std::atomic<bool> shutdown_{false};
28
};
自定义任务队列的步骤:
① 继承 CustomTaskQueue
类: 创建一个新的类,继承自 folly::CustomTaskQueue
。
② 实现抽象方法: 实现 add()
, take()
, size()
, empty()
这四个纯虚函数,根据需要选择性地实现 shutdown()
和 isShutdown()
。
③ 在 take()
方法中处理线程池停止信号: 在 take()
方法中,需要检查 isShutdown()
的返回值。如果线程池已经停止,并且队列为空,则应该返回 nullptr
,通知工作线程退出。
④ 创建 CPUThreadPoolExecutor
时使用自定义队列: 在创建 CPUThreadPoolExecutor
实例时,将自定义任务队列的 unique_ptr
传递给构造函数。
代码示例:自定义有界阻塞队列
1
#include <folly/executors/task_queue/CustomTaskQueue.h>
2
#include <folly/Function.h>
3
#include <queue>
4
#include <mutex>
5
#include <condition_variable>
6
#include <atomic>
7
8
namespace folly {
9
10
class BoundedBlockingQueue : public CustomTaskQueue {
11
public:
12
explicit BoundedBlockingQueue(size_t capacity) : capacity_(capacity) {
13
if (capacity_ == 0) {
14
throw std::invalid_argument("BoundedBlockingQueue capacity must be positive");
15
}
16
}
17
18
void add(Func<void()> func) override {
19
std::unique_lock<std::mutex> lock(mutex_);
20
// 如果队列已满,则阻塞等待直到有空间
21
cv_not_full_.wait(lock, [this] { return queue_.size() < capacity_ || isShutdown(); });
22
if (isShutdown()) {
23
return; // 线程池已停止,不再添加任务
24
}
25
queue_.push(std::move(func));
26
cv_not_empty_.notify_one(); // 通知等待在 take() 的线程
27
}
28
29
Func<void()> take() override {
30
std::unique_lock<std::mutex> lock(mutex_);
31
cv_not_empty_.wait(lock, [this] { return !queue_.empty() || isShutdown(); });
32
if (isShutdown() && queue_.empty()) {
33
return nullptr; // 线程池已停止且队列为空
34
}
35
Func<void()> func = queue_.front();
36
queue_.pop();
37
cv_not_full_.notify_one(); // 通知等待在 add() 的线程
38
return func;
39
}
40
41
size_t size() const override {
42
std::lock_guard<std::mutex> lock(mutex_);
43
return queue_.size();
44
}
45
46
bool empty() const override {
47
std::lock_guard<std::mutex> lock(mutex_);
48
return queue_.empty();
49
}
50
51
private:
52
size_t capacity_;
53
std::queue<Func<void()>> queue_;
54
std::mutex mutex_;
55
std::condition_variable cv_not_empty_;
56
std::condition_variable cv_not_full_;
57
};
58
59
} // namespace folly
60
61
62
#include <folly/executors/CPUThreadPoolExecutor.h>
63
#include <iostream>
64
#include <thread>
65
66
using namespace folly;
67
using namespace std;
68
69
int main() {
70
// 创建使用有界队列的线程池,队列容量为 5
71
CPUThreadPoolExecutor executor(4, make_unique<BoundedBlockingQueue>(5));
72
executor.start();
73
74
// 提交任务,当队列满时,add() 操作会阻塞
75
for (int i = 0; i < 10; ++i) {
76
cout << "Adding task " << i << endl;
77
executor.add([i]() {
78
cout << "Task " << i << " executed by thread " << this_thread::get_id() << endl;
79
this_thread::sleep_for(chrono::milliseconds(100));
80
});
81
}
82
83
this_thread::sleep_for(chrono::seconds(2));
84
executor.stop();
85
return 0;
86
}
总结:
⚝ folly::CPUThreadPoolExecutor
允许自定义任务队列,提供了高度的灵活性。
⚝ 通过继承 folly::CustomTaskQueue
类并实现其抽象方法,可以创建自定义的任务队列。
⚝ 自定义任务队列可以满足各种特殊的任务调度和管理需求,例如有界队列、优先级队列、延迟队列等。
⚝ 在 take()
方法中需要正确处理线程池停止信号,确保线程能够正常退出。
⚝ 自定义任务队列是 CPUThreadPoolExecutor
的一项高级特性,可以根据具体的应用场景进行定制。
3.4 异常处理(Exception Handling)机制
在并发编程中,异常处理(Exception Handling) 尤为重要。线程池中的任务在执行过程中可能会抛出异常,如果不妥善处理,可能会导致程序崩溃、资源泄漏或其他不可预测的行为。folly::CPUThreadPoolExecutor
提供了机制来处理任务执行过程中抛出的异常,但其处理方式相对简单直接。
CPUThreadPoolExecutor
的异常处理方式:
当提交给 CPUThreadPoolExecutor
的任务(folly::Func<void()>
)在执行过程中抛出异常时,CPUThreadPoolExecutor
默认会捕获这些异常,但不会主动传播或重新抛出异常。 而是会忽略这些异常,并继续执行后续的任务。
这意味着,默认情况下,如果任务抛出异常,你不会直接在提交任务的代码中捕获到这个异常。 异常信息可能会被记录到日志中(如果 CPUThreadPoolExecutor
内部有日志记录机制,但 folly
库本身在这方面比较精简,通常需要开发者自行添加日志),但任务的执行结果会被视为失败,但不会影响线程池的正常运行。
如何获取任务执行异常:
由于 CPUThreadPoolExecutor
默认不传播异常,如果需要获取任务执行过程中的异常信息,需要开发者在任务代码中自行处理。 以下是一些常见的处理异常的方法:
① 在任务内部捕获异常并处理: 最常见的方法是在提交的任务函数内部使用 try-catch
块来捕获可能抛出的异常,并在 catch
块中进行相应的处理,例如:
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <folly/executors/task_queue/LinkedBlockingQueue.h>
3
#include <iostream>
4
#include <stdexcept>
5
6
using namespace folly;
7
using namespace std;
8
9
int main() {
10
CPUThreadPoolExecutor executor(4, make_unique<LinkedBlockingQueue>());
11
executor.start();
12
13
executor.add([]() {
14
try {
15
// 模拟可能抛出异常的任务
16
throw runtime_error("Task failed with an exception!");
17
} catch (const exception& e) {
18
cerr << "Caught exception in task: " << e.what() << endl;
19
// 在这里进行异常处理,例如记录日志、清理资源等
20
}
21
});
22
23
executor.add([]() {
24
cout << "This task will still be executed." << endl;
25
});
26
27
this_thread::sleep_for(chrono::seconds(1));
28
executor.stop();
29
return 0;
30
}
② 使用 folly::Try<T>
返回值: 如果任务需要返回结果,并且希望能够显式地处理异常情况,可以使用 folly::Try<T>
类型作为任务的返回值。folly::Try<T>
可以表示一个操作的结果,它可以是成功的结果值 T
,也可以是失败的异常。
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <folly/executors/task_queue/LinkedBlockingQueue.h>
3
#include <folly/Try.h>
4
#include <iostream>
5
#include <stdexcept>
6
7
using namespace folly;
8
using namespace std;
9
10
// 任务函数返回 folly::Try<int>
11
Try<int> taskWithResult() {
12
// 模拟可能抛出异常的任务
13
// throw runtime_error("Task failed with an exception!");
14
return 42; // 正常返回结果
15
}
16
17
int main() {
18
CPUThreadPoolExecutor executor(4, make_unique<LinkedBlockingQueue>());
19
executor.start();
20
21
auto futureResult = executor.addFuture([]() {
22
return taskWithResult(); // 使用 addFuture 提交返回 Future 的任务
23
});
24
25
// 获取任务结果
26
Try<int> result = futureResult.get();
27
28
if (result.hasException()) {
29
cerr << "Task failed with exception: " << result.exception().what() << endl;
30
} else {
31
cout << "Task result: " << result.value() << endl;
32
}
33
34
executor.stop();
35
return 0;
36
}
注意事项:
⚝ 异常安全(Exception Safety): 在编写并发代码时,务必考虑异常安全。确保在异常发生时,程序状态仍然保持一致,资源能够被正确释放,避免资源泄漏和数据损坏。
⚝ 日志记录: 建议在任务的异常处理代码中添加日志记录,以便于排查问题和监控系统运行状态。
⚝ 错误处理策略: 根据具体的应用场景,制定合适的错误处理策略。例如,对于关键任务,可能需要在异常发生时进行重试或告警;对于非关键任务,可以忽略异常或进行降级处理。
总结:
⚝ folly::CPUThreadPoolExecutor
默认捕获任务异常但不传播,而是忽略异常并继续执行后续任务。
⚝ 需要在任务代码中自行处理异常,例如使用 try-catch
块捕获异常并处理。
⚝ 可以使用 folly::Try<T>
作为任务返回值,显式地处理异常情况。
⚝ 编写并发代码时,务必考虑异常安全,并制定合适的错误处理策略。
⚝ CPUThreadPoolExecutor
的异常处理机制相对简单,更侧重于保证线程池的稳定运行,而不是提供复杂的异常传播和处理机制。
3.5 线程池监控与指标(Thread Pool Monitoring and Metrics)
线程池监控与指标(Thread Pool Monitoring and Metrics) 是评估线程池性能、诊断问题和进行性能调优的关键手段。通过监控线程池的各项指标,可以了解线程池的运行状态、资源利用率、任务处理能力等信息,从而及时发现潜在的瓶颈和问题,并采取相应的优化措施。
folly::CPUThreadPoolExecutor
提供了一些 API 来获取线程池的运行指标,开发者可以利用这些指标进行监控和分析。
3.5.1 性能指标(Performance Metrics)
CPUThreadPoolExecutor
提供以下主要的性能指标:
① 任务队列大小(Task Queue Size): 通过 getTaskQueueSize()
方法获取当前任务队列中等待执行的任务数量。
▮▮▮▮⚝ 指标意义: 反映了任务积压的程度。队列大小持续增长可能表示线程池处理能力不足,或者任务提交速率过快。
▮▮▮▮⚝ 监控用途: 判断线程池是否过载,是否需要扩容线程池大小。
② 活跃线程数(Active Thread Count): 通过 getStats().activeThreads
获取当前正在执行任务的线程数量。
▮▮▮▮⚝ 指标意义: 反映了线程池的繁忙程度。活跃线程数接近或达到最大线程数,可能表示线程池已经充分利用,但如果任务队列仍然很长,则可能需要进一步扩容。
▮▮▮▮⚝ 监控用途: 评估线程池的利用率,判断是否需要调整线程池大小。
③ 最大线程数(Max Thread Count): 通过 getStats().maxThreads
获取线程池配置的最大线程数量。
▮▮▮▮⚝ 指标意义: 线程池允许创建的最大线程数上限。
▮▮▮▮⚝ 监控用途: 了解线程池的容量上限,用于评估扩容空间。
④ 空闲线程数(Idle Thread Count): 可以通过 getStats().idleThreads
获取当前空闲的线程数量。
▮▮▮▮⚝ 指标意义: 反映了线程池的资源空闲程度。空闲线程数过多可能表示线程池资源浪费,可以考虑缩减线程池大小。
▮▮▮▮⚝ 监控用途: 评估线程池的资源利用率,判断是否需要缩减线程池大小。
⑤ 已完成任务数(Finished Task Count): 通过 getStats().finishedTasks
获取线程池已完成的任务总数。
▮▮▮▮⚝ 指标意义: 反映了线程池的总工作量。
▮▮▮▮⚝ 监控用途: 统计任务处理量,评估线程池的吞吐量。
⑥ 排队时间(Queueing Time)和执行时间(Execution Time): CPUThreadPoolExecutor
不直接提供任务的排队时间和执行时间指标。 如果需要监控这些指标,需要在任务代码中手动记录时间戳,并在任务执行前后计算时间差。
代码示例:监控线程池指标
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <folly/executors/task_queue/LinkedBlockingQueue.h>
3
#include <iostream>
4
#include <thread>
5
#include <chrono>
6
7
using namespace folly;
8
using namespace std;
9
10
int main() {
11
CPUThreadPoolExecutor executor(4, make_unique<LinkedBlockingQueue>());
12
executor.start();
13
14
// 提交一些任务
15
for (int i = 0; i < 10; ++i) {
16
executor.add([i]() {
17
this_thread::sleep_for(chrono::milliseconds(50));
18
});
19
}
20
21
// 监控线程池指标
22
for (int i = 0; i < 5; ++i) {
23
this_thread::sleep_for(chrono::seconds(1));
24
25
auto stats = executor.getStats();
26
cout << "--- Thread Pool Metrics ---" << endl;
27
cout << "Task Queue Size: " << executor.getTaskQueueSize() << endl;
28
cout << "Active Threads: " << stats.activeThreads << endl;
29
cout << "Max Threads: " << stats.maxThreads << endl;
30
cout << "Idle Threads: " << stats.idleThreads << endl;
31
cout << "Finished Tasks: " << stats.finishedTasks << endl;
32
cout << "-------------------------" << endl;
33
}
34
35
executor.stop();
36
return 0;
37
}
3.5.2 监控工具集成(Monitoring Tool Integration)
folly::CPUThreadPoolExecutor
本身没有内置的监控工具集成功能。 要将 CPUThreadPoolExecutor
的监控指标集成到现有的监控系统中,需要手动进行集成。 以下是一些常见的集成方式:
① 日志记录: 将线程池的指标信息定期记录到日志文件中。可以使用 folly::Logger
或其他日志库来实现。然后可以使用日志分析工具(例如 ELK Stack, Splunk)对日志进行分析和可视化。
② 指标上报系统: 将线程池的指标信息上报到指标监控系统,例如 Prometheus, Graphite, InfluxDB 等。可以使用相应的客户端库将指标数据发送到监控系统。
③ 自定义监控界面: 开发自定义的监控界面,实时展示线程池的各项指标。可以使用 Web 技术(例如 HTML, CSS, JavaScript)结合后端服务来实现。
集成步骤(以 Prometheus 为例):
① 选择 Prometheus 客户端库: 选择 C++ 的 Prometheus 客户端库,例如 prometheus-cpp
。
② 注册指标: 在程序初始化阶段,使用 Prometheus 客户端库注册需要监控的线程池指标,例如任务队列大小、活跃线程数等。
1
#include <prometheus/exposer.h>
2
#include <prometheus/registry.h>
3
#include <prometheus/gauge.h>
4
#include <folly/executors/CPUThreadPoolExecutor.h>
5
#include <folly/executors/task_queue/LinkedBlockingQueue.h>
6
7
using namespace folly;
8
using namespace prometheus;
9
10
int main() {
11
// 创建 Prometheus Registry
12
auto registry = std::make_shared<Registry>();
13
14
// 创建 Gauge 指标
15
auto& taskQueueSizeGauge = BuildGauge()
16
.Name("thread_pool_task_queue_size")
17
.Help("Current task queue size of the thread pool")
18
.Register(*registry);
19
20
auto& activeThreadsGauge = BuildGauge()
21
.Name("thread_pool_active_threads")
22
.Help("Current active threads in the thread pool")
23
.Register(*registry);
24
25
// 创建 Exposer,用于暴露指标数据
26
Exposer exposer{"9100"}; // 监听端口 9100
27
exposer.RegisterCollectable(registry);
28
29
// 创建线程池
30
CPUThreadPoolExecutor executor(4, make_unique<LinkedBlockingQueue>());
31
executor.start();
32
33
// 定期更新指标数据
34
while (true) {
35
taskQueueSizeGauge.Set(executor.getTaskQueueSize());
36
activeThreadsGauge.Set(executor.getStats().activeThreads);
37
std::this_thread::sleep_for(std::chrono::seconds(5));
38
}
39
40
executor.stop();
41
return 0;
42
}
③ 暴露指标数据: 使用 Prometheus Exposer 将注册的指标数据暴露出来,供 Prometheus Server 抓取。
④ 配置 Prometheus Server: 配置 Prometheus Server,使其能够抓取应用程序暴露的指标数据。
⑤ 使用 Grafana 或其他工具进行可视化: 使用 Grafana 或其他可视化工具,连接 Prometheus Server,创建仪表盘,展示线程池的监控指标。
总结:
⚝ folly::CPUThreadPoolExecutor
提供了一些 API 来获取线程池的性能指标,例如任务队列大小、活跃线程数、最大线程数等。
⚝ 可以利用这些指标进行线程池的监控和性能分析。
⚝ CPUThreadPoolExecutor
没有内置的监控工具集成功能,需要手动进行集成。
⚝ 常见的集成方式包括日志记录、指标上报系统和自定义监控界面。
⚝ 可以使用 Prometheus 等监控系统,结合客户端库和 Exposer,将线程池指标集成到监控系统中,并使用 Grafana 等工具进行可视化。
⚝ 线程池监控是性能调优和问题诊断的重要手段,建议在生产环境中部署线程池监控系统。
3.6 配置选项详解(Detailed Explanation of Configuration Options)
folly::CPUThreadPoolExecutor
提供了丰富的配置选项,允许开发者根据具体的应用场景和性能需求,灵活地定制线程池的行为。 这些配置选项主要分为线程池参数配置和队列类型选择两类。
3.6.1 线程池参数配置(Thread Pool Parameter Configuration)
CPUThreadPoolExecutor
的构造函数允许配置以下关键参数:
① threads
(初始线程数和最大线程数): 构造函数的第一个参数 threads
同时指定了线程池的初始线程数和最大线程数。
▮▮▮▮⚝ 参数类型: size_t
▮▮▮▮⚝ 参数意义:
▮▮▮▮ⓐ 初始线程数: 线程池启动时创建的线程数量。
▮▮▮▮ⓑ 最大线程数: 线程池允许创建的最大线程数量。当任务队列积压且活跃线程数小于最大线程数时,线程池会创建新的线程来处理任务,直到达到最大线程数。
▮▮▮▮⚝ 默认值: 无默认值,必须显式指定。
▮▮▮▮⚝ 配置建议:
▮▮▮▮ⓐ CPU 密集型任务: 对于 CPU 密集型任务,线程数通常设置为 CPU 核心数或略多于核心数,例如 std::thread::hardware_concurrency()
或 2 * std::thread::hardware_concurrency()
。
▮▮▮▮ⓑ IO 密集型任务: 对于 IO 密集型任务,线程数可以设置得更大,因为线程在等待 IO 操作时会处于阻塞状态,可以利用多线程来提高并发度。例如,可以设置为 CPU 核心数的数倍,具体数值需要根据 IO 阻塞的程度和系统资源进行调整。
▮▮▮▮ⓒ 动态调整: 如 3.1 节所述,可以根据实际负载情况动态调整线程池大小。
② queue
(任务队列): 构造函数的第二个参数 queue
指定了线程池使用的任务队列。
▮▮▮▮⚝ 参数类型: std::unique_ptr<CustomTaskQueue>
▮▮▮▮⚝ 参数意义: 用于存储待执行任务的队列。任务提交到线程池后,首先进入任务队列,然后由线程池中的线程从队列中取出任务并执行。
▮▮▮▮⚝ 默认值: 如果不指定,CPUThreadPoolExecutor
默认使用 LinkedBlockingQueue
作为任务队列。
▮▮▮▮⚝ 配置建议: 队列类型的选择将在 3.6.2 节详细讨论。
③ threadFactory
(线程工厂): CPUThreadPoolExecutor
允许通过 ThreadFactory
来自定义线程的创建方式,但在构造函数中没有直接的参数来配置 ThreadFactory
。 ThreadFactory
通常是在更底层的线程池基类中配置的,CPUThreadPoolExecutor
继承了这些配置能力。
▮▮▮▮⚝ 参数类型: std::shared_ptr<ThreadFactory>
(通常通过基类配置)
▮▮▮▮⚝ 参数意义: 用于创建线程池中的工作线程。可以自定义线程的属性,例如线程名称、优先级、栈大小等。
▮▮▮▮⚝ 默认值: 默认使用 DefaultThreadFactory
,创建默认属性的线程。
▮▮▮▮⚝ 配置建议: 在大多数情况下,默认的 DefaultThreadFactory
已经足够使用。 如果需要自定义线程属性,例如设置线程名称以便于调试和监控,可以创建自定义的 ThreadFactory
。
代码示例:配置线程池参数
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <folly/executors/task_queue/LinkedBlockingQueue.h>
3
#include <folly/executors/NamedThreadFactory.h> // NamedThreadFactory
4
#include <iostream>
5
6
using namespace folly;
7
using namespace std;
8
9
int main() {
10
// 配置线程池参数:
11
// - 线程数:8 (初始线程数和最大线程数都为 8)
12
// - 任务队列:使用默认的 LinkedBlockingQueue
13
// - 线程工厂:使用 NamedThreadFactory,设置线程名称前缀为 "MyThreadPool-Worker-"
14
CPUThreadPoolExecutor executor(
15
8,
16
make_unique<LinkedBlockingQueue>(),
17
std::make_shared<NamedThreadFactory>("MyThreadPool-Worker-"));
18
19
executor.start();
20
21
for (int i = 0; i < 10; ++i) {
22
executor.add([i]() {
23
cout << "Task " << i << " executed by thread " << this_thread::get_id() << endl;
24
this_thread::sleep_for(chrono::milliseconds(100));
25
});
26
}
27
28
this_thread::sleep_for(chrono::seconds(2));
29
executor.stop();
30
return 0;
31
}
3.6.2 队列类型选择(Queue Type Selection)
folly::CPUThreadPoolExecutor
允许使用多种类型的任务队列,不同的队列类型具有不同的特性和适用场景。 选择合适的队列类型对于线程池的性能和行为至关重要。 folly
库本身提供了一些常用的任务队列实现,开发者也可以自定义任务队列(如 3.3 节所述)。
folly
库提供的常见任务队列类型:
① LinkedBlockingQueue
: 基于链表的无界阻塞队列。
▮▮▮▮⚝ 特性:
▮▮▮▮ⓐ 无界性: 理论上队列容量没有上限,可以容纳无限多的任务(受限于系统内存)。
▮▮▮▮ⓑ 阻塞性: 当队列为空时,take()
操作会阻塞等待,直到有任务可用;当队列已满时(实际上 LinkedBlockingQueue
不会满),add()
操作不会阻塞。
▮▮▮▮⚝ 适用场景: 通用场景,适用于任务提交速率和处理速率相对平衡的情况。 由于是无界队列,需要注意任务积压过多导致内存溢出的风险。
▮▮▮▮⚝ 优点: 实现简单,性能较好,适用于大多数场景。
▮▮▮▮⚝ 缺点: 无界队列,可能导致内存溢出风险。
② ArrayBlockingQueue
: 基于数组的有界阻塞队列。
▮▮▮▮⚝ 特性:
▮▮▮▮ⓐ 有界性: 队列容量有限,在创建时需要指定容量大小。
▮▮▮▮ⓑ 阻塞性: 当队列为空时,take()
操作会阻塞等待;当队列已满时,add()
操作会阻塞等待,直到队列有空间。
▮▮▮▮⚝ 适用场景: 适用于需要限制任务队列大小,防止任务积压过多导致内存溢出的场景。 也适用于需要控制任务提交速率的场景,当队列满时,任务提交者会被阻塞,从而实现流量控制。
▮▮▮▮⚝ 优点: 有界队列,可以防止内存溢出;可以实现流量控制。
▮▮▮▮⚝ 缺点: 固定容量,可能在某些场景下限制线程池的吞吐量;数组实现,可能在队列扩容时有性能开销。
③ PriorityBlockingQueue
: 基于优先级堆的无界优先级阻塞队列(自定义实现,如 3.2 节所示)。
▮▮▮▮⚝ 特性:
▮▮▮▮ⓐ 优先级调度: 任务按照优先级顺序出队,高优先级任务优先执行。
▮▮▮▮ⓑ 无界性: 理论上队列容量没有上限。
▮▮▮▮ⓒ 阻塞性: 当队列为空时,take()
操作会阻塞等待。
▮▮▮▮⚝ 适用场景: 适用于需要根据任务优先级进行调度的场景,例如实时系统、QoS 保障系统等。
▮▮▮▮⚝ 优点: 支持优先级调度,可以保证高优先级任务的及时处理。
▮▮▮▮⚝ 缺点: 优先级队列的实现相对复杂,性能可能比普通队列略差;无界队列,可能导致内存溢出风险;需要注意优先级反转和优先级饥饿问题。
④ LifoBlockingQueue
: 后进先出阻塞队列(Stack-like Queue)。
▮▮▮▮⚝ 特性:
▮▮▮▮ⓐ LIFO 调度: 任务按照后进先出顺序出队,类似于栈。
▮▮▮▮ⓑ 阻塞性: 当队列为空时,take()
操作会阻塞等待。
▮▮▮▮⚝ 适用场景: 某些特定的任务处理模式,例如深度优先搜索、回溯算法等。 在某些缓存场景中,LIFO 队列可以提高缓存命中率。
▮▮▮▮⚝ 优点: 适用于特定的任务处理模式,可能提高缓存命中率。
▮▮▮▮⚝ 缺点: 适用场景有限,通用性较差。
队列类型选择建议:
⚝ 通用场景: LinkedBlockingQueue
通常是默认且不错的选择,适用于大多数通用场景。
⚝ 防止内存溢出: 如果需要严格控制任务队列的大小,防止任务积压过多导致内存溢出,可以使用 ArrayBlockingQueue
。
⚝ 优先级调度: 如果需要根据任务优先级进行调度,可以使用自定义的 PriorityBlockingQueue
。
⚝ 特定任务模式: 在某些特定的任务处理模式下,例如需要 LIFO 调度时,可以使用 LifoBlockingQueue
。
⚝ 性能敏感场景: 在对性能要求非常高的场景中,需要根据具体的 workload 进行性能测试和评估,选择性能最佳的队列类型。 可能需要考虑自定义更高效的队列实现。
代码示例:选择不同的队列类型
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <folly/executors/task_queue/LinkedBlockingQueue.h>
3
#include <folly/executors/task_queue/ArrayBlockingQueue.h>
4
// #include "PriorityBlockingQueue.h" // 自定义的优先级队列 (如果使用)
5
// #include <folly/executors/task_queue/LifoBlockingQueue.h> // folly 库中没有 LifoBlockingQueue,需要自定义
6
7
#include <iostream>
8
9
using namespace folly;
10
using namespace std;
11
12
int main() {
13
// 使用 LinkedBlockingQueue (默认)
14
CPUThreadPoolExecutor executor1(4, make_unique<LinkedBlockingQueue>());
15
executor1.start();
16
cout << "Using LinkedBlockingQueue" << endl;
17
executor1.stop();
18
19
// 使用 ArrayBlockingQueue,容量为 10
20
CPUThreadPoolExecutor executor2(4, make_unique<ArrayBlockingQueue>(10));
21
executor2.start();
22
cout << "Using ArrayBlockingQueue with capacity 10" << endl;
23
executor2.stop();
24
25
// 使用自定义的 PriorityBlockingQueue (假设已实现)
26
// CPUThreadPoolExecutor executor3(4, make_unique<PriorityBlockingQueue>());
27
// executor3.start();
28
// cout << "Using PriorityBlockingQueue" << endl;
29
// executor3.stop();
30
31
return 0;
32
}
总结:
⚝ folly::CPUThreadPoolExecutor
允许选择多种类型的任务队列,包括 LinkedBlockingQueue
, ArrayBlockingQueue
等。
⚝ 不同的队列类型具有不同的特性和适用场景。
⚝ LinkedBlockingQueue
适用于通用场景,ArrayBlockingQueue
适用于需要限制队列大小的场景,PriorityBlockingQueue
适用于优先级调度,LifoBlockingQueue
适用于特定任务模式。
⚝ 选择合适的队列类型对于线程池的性能和行为至关重要,需要根据具体的应用场景和需求进行选择。
⚝ 开发者也可以自定义任务队列,以满足更特殊的需求。
END_OF_CHAPTER
4. chapter 4: folly::CPUThreadPoolExecutor
高级应用与实战
4.1 高并发场景下的 CPUThreadPoolExecutor
应用
在高并发(High Concurrency)场景中,系统需要同时处理大量的请求或任务。这种场景常见于Web服务器、在线游戏、实时数据处理系统等。有效地管理和调度并发任务是保证系统性能和稳定性的关键。folly::CPUThreadPoolExecutor
正是在这种高并发环境下大放异彩的利器。
高并发场景的挑战
① 资源竞争(Resource Contention):大量并发任务同时执行,可能导致对CPU、内存、I/O等资源的激烈竞争,降低系统整体吞吐量。
② 上下文切换开销(Context Switching Overhead):频繁的线程上下文切换会消耗大量的CPU时间,尤其在高并发下,这种开销更加显著。
③ 响应延迟增加(Increased Response Latency):如果任务调度不合理,或者线程创建销毁频繁,会导致请求响应时间延长,用户体验下降。
④ 系统稳定性风险(System Stability Risk):在高负载压力下,资源耗尽或线程管理不当可能导致系统崩溃。
CPUThreadPoolExecutor
在高并发场景中的优势
CPUThreadPoolExecutor
通过线程池化技术,有效地缓解了上述挑战:
① 减少线程创建和销毁开销:线程池预先创建一定数量的线程,任务到来时直接从线程池中获取线程执行,避免了频繁创建和销毁线程的开销,提高了任务处理效率。
② 控制并发度,避免资源耗尽:线程池可以限制同时执行的任务数量,防止无限制地创建线程导致系统资源耗尽,保证系统稳定性。
③ 提高资源利用率:线程池中的线程可以被多个任务复用,提高了CPU等资源的利用率。
④ 提供灵活的任务调度策略:CPUThreadPoolExecutor
提供了多种任务队列和调度策略,可以根据不同的应用场景进行灵活配置,优化任务执行效率。
高并发场景应用示例
假设我们构建一个高并发的Web服务器,需要处理大量的用户请求。每个请求的处理逻辑可能包括:
- 接收客户端请求。
- 解析请求参数。
- 查询数据库。
- 处理业务逻辑。
- 生成响应结果。
- 发送响应给客户端。
在高并发场景下,如果为每个请求都创建一个新的线程来处理,将会导致大量的线程创建和销毁开销,以及频繁的上下文切换,严重影响服务器性能。使用 CPUThreadPoolExecutor
可以有效地解决这个问题。
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <folly/futures/Future.h>
3
#include <iostream>
4
#include <string>
5
6
using namespace folly;
7
8
// 模拟请求处理函数
9
std::string handleRequest(const std::string& request) {
10
// 模拟耗时操作
11
std::this_thread::sleep_for(std::chrono::milliseconds(100));
12
return "Response for: " + request;
13
}
14
15
int main() {
16
// 创建一个固定大小为 16 的线程池
17
CPUThreadPoolExecutor executor(16);
18
executor.start();
19
20
std::vector<Future<std::string>> futures;
21
for (int i = 0; i < 100; ++i) {
22
std::string request = "Request-" + std::to_string(i);
23
// 提交任务到线程池
24
futures.push_back(executor.add([request]() {
25
return handleRequest(request);
26
}));
27
}
28
29
// 等待所有任务完成并获取结果
30
for (auto& future : futures) {
31
std::cout << future.get() << std::endl;
32
}
33
34
executor.stop();
35
return 0;
36
}
在这个例子中,我们创建了一个固定大小为 16 的 CPUThreadPoolExecutor
。当有新的请求到来时,我们使用 executor.add()
方法将请求处理任务提交到线程池。线程池会自动从线程池中选择一个空闲线程来执行任务。由于线程池的大小是固定的,在高并发请求下,线程池会控制并发执行的任务数量,避免系统资源被耗尽。
总结
在高并发场景下,CPUThreadPoolExecutor
通过线程池化技术,有效地降低了线程管理开销,提高了资源利用率,并提供了灵活的任务调度策略,是构建高性能、高稳定性的并发系统的关键组件。合理配置线程池的大小和参数,可以充分发挥 CPUThreadPoolExecutor
的优势,应对各种高并发挑战。
4.2 结合 folly
库其他组件的应用
folly
库作为一个强大的C++库,提供了丰富的组件来支持高性能和高效率的开发。CPUThreadPoolExecutor
可以与 folly
库中的其他组件协同工作,构建更加强大和灵活的并发应用。
与 folly::Future/Promise
结合
folly::Future
和 folly::Promise
是 folly
库中用于异步编程的重要组件。Future
代表异步操作的结果,Promise
用于设置 Future
的结果。CPUThreadPoolExecutor
可以与 Future/Promise
结合,实现更加灵活和强大的异步任务管理。
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <folly/futures/Future.h>
3
#include <folly/futures/Promise.h>
4
#include <iostream>
5
#include <string>
6
7
using namespace folly;
8
9
// 模拟耗时任务,使用 Promise 设置 Future 的结果
10
Future<std::string> asyncTask(CPUThreadPoolExecutor& executor, const std::string& taskName) {
11
Promise<std::string> promise;
12
auto future = promise.getFuture();
13
14
executor.add([promise = std::move(promise), taskName]() mutable {
15
// 模拟耗时操作
16
std::this_thread::sleep_for(std::chrono::milliseconds(200));
17
promise.setValue("Task " + taskName + " completed in thread " + std::to_string(std::this_thread::get_id().hash()));
18
});
19
return future;
20
}
21
22
int main() {
23
CPUThreadPoolExecutor executor(4);
24
executor.start();
25
26
std::vector<Future<std::string>> futures;
27
for (int i = 0; i < 5; ++i) {
28
futures.push_back(asyncTask(executor, std::to_string(i)));
29
}
30
31
// 等待所有 Future 完成并获取结果
32
for (auto& future : futures) {
33
std::cout << future.get() << std::endl;
34
}
35
36
executor.stop();
37
return 0;
38
}
在这个例子中,asyncTask
函数创建了一个 Promise
和对应的 Future
。任务提交到 CPUThreadPoolExecutor
后,在线程池中的线程执行任务,并在任务完成后通过 promise.setValue()
设置 Future
的结果。主线程可以通过 future.get()
等待任务完成并获取结果。这种方式实现了异步任务的提交、执行和结果获取,使得并发编程更加灵活和易于管理。
与 folly::Function
结合
folly::Function
是 folly
库中一个高性能的函数对象封装器,可以用于存储和调用各种类型的可调用对象,包括普通函数、Lambda 表达式、成员函数等。CPUThreadPoolExecutor
可以与 folly::Function
结合,提高任务提交的灵活性和效率。
虽然 CPUThreadPoolExecutor::add()
方法本身就可以接受各种可调用对象,但使用 folly::Function
可以更明确地管理和传递函数对象,尤其是在需要对函数对象进行更复杂操作的场景下。
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <folly/Function.h>
3
#include <iostream>
4
5
using namespace folly;
6
7
void simpleTask(int id) {
8
std::cout << "Task " << id << " executed in thread " << std::this_thread::get_id().hash() << std::endl;
9
}
10
11
int main() {
12
CPUThreadPoolExecutor executor(2);
13
executor.start();
14
15
for (int i = 0; i < 4; ++i) {
16
// 使用 folly::Function 封装任务
17
Function<void()> task = std::bind(simpleTask, i);
18
executor.add(std::move(task)); // 注意 move,避免拷贝
19
}
20
21
executor.stop();
22
return 0;
23
}
在这个例子中,我们使用 folly::Function<void()>
封装了 simpleTask
函数,并通过 std::bind
绑定了任务 ID。然后将 folly::Function
对象提交到 CPUThreadPoolExecutor
执行。使用 folly::Function
可以更清晰地表达任务的类型,并且在某些情况下可以提高性能,尤其是在处理复杂的函数对象时。
与其他 folly
组件的潜在结合
除了 Future/Promise
和 Function
,CPUThreadPoolExecutor
还可以与其他 folly
组件结合,例如:
⚝ folly::ConcurrentQueue
: 可以作为 CPUThreadPoolExecutor
的任务队列,提供更高效的并发队列操作。
⚝ folly::Singleton
: 可以用于管理全局唯一的 CPUThreadPoolExecutor
实例。
⚝ folly::Benchmark
: 可以用于对使用 CPUThreadPoolExecutor
的代码进行性能基准测试。
⚝ folly::dynamic
: 在需要处理动态类型任务的场景下,可以结合 folly::dynamic
来处理不同类型的任务。
总结
CPUThreadPoolExecutor
与 folly
库的其他组件结合使用,可以构建更加强大、灵活和高效的并发应用。Future/Promise
提供了异步编程的基础设施,Function
提高了函数对象管理的灵活性,而其他 folly
组件则可以从不同方面增强 CPUThreadPoolExecutor
的功能和性能。深入理解和灵活运用这些组件,可以充分发挥 folly
库的优势,构建高质量的并发系统。
4.3 异步编程(Asynchronous Programming)与 CPUThreadPoolExecutor
异步编程(Asynchronous Programming)是一种并发编程模式,它允许程序发起一个任务后,不必等待任务完成就可以继续执行后续操作。当任务完成时,程序会得到通知并处理任务结果。异步编程的核心思想是非阻塞(Non-blocking),它可以提高程序的并发性和响应性。
异步编程的优势
① 提高程序响应性(Improved Responsiveness):在执行耗时操作时,程序不会被阻塞,可以继续响应用户操作或其他事件,提升用户体验。
② 提高资源利用率(Improved Resource Utilization):异步编程可以充分利用CPU和I/O资源,提高系统的吞吐量和效率。
③ 简化并发编程模型(Simplified Concurrent Programming Model):相比于传统的线程和锁机制,异步编程通常使用更简洁的模型,例如 Future/Promise
、回调函数、协程等,降低了并发编程的复杂性。
CPUThreadPoolExecutor
在异步编程中的作用
CPUThreadPoolExecutor
是实现异步编程的重要工具。它可以将异步任务提交到线程池中执行,从而实现任务的并发执行,而主线程无需等待任务完成。结合 folly::Future/Promise
,可以构建完整的异步编程模型。
异步编程模型示例
以下示例展示了如何使用 CPUThreadPoolExecutor
和 folly::Future/Promise
实现异步任务的提交、执行和结果获取。
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <folly/futures/Future.h>
3
#include <folly/futures/Promise.h>
4
#include <iostream>
5
#include <string>
6
7
using namespace folly;
8
9
// 模拟异步任务
10
Future<int> asyncCalculate(CPUThreadPoolExecutor& executor, int a, int b) {
11
Promise<int> promise;
12
auto future = promise.getFuture();
13
14
executor.add([promise = std::move(promise), a, b]() mutable {
15
// 模拟耗时计算
16
std::this_thread::sleep_for(std::chrono::milliseconds(300));
17
promise.setValue(a + b);
18
});
19
return future;
20
}
21
22
int main() {
23
CPUThreadPoolExecutor executor(4);
24
executor.start();
25
26
// 发起异步计算任务
27
auto future1 = asyncCalculate(executor, 10, 20);
28
auto future2 = asyncCalculate(executor, 30, 40);
29
30
// 主线程可以继续执行其他操作,无需等待异步任务完成
31
std::cout << "Async tasks submitted, main thread continues to execute..." << std::endl;
32
33
// 等待异步任务完成并获取结果
34
int result1 = future1.get();
35
int result2 = future2.get();
36
37
std::cout << "Result 1: " << result1 << std::endl;
38
std::cout << "Result 2: " << result2 << std::endl;
39
40
executor.stop();
41
return 0;
42
}
在这个例子中,asyncCalculate
函数返回一个 Future<int>
,代表异步计算的结果。任务提交到 CPUThreadPoolExecutor
后,在线程池中异步执行。主线程在提交任务后可以立即返回,继续执行后续操作。当需要获取异步任务结果时,可以通过 future.get()
方法等待任务完成并获取结果。
异步编程与回调函数
除了 Future/Promise
,回调函数(Callback Function)也是异步编程中常用的机制。CPUThreadPoolExecutor
可以结合回调函数来处理异步任务的结果。
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <iostream>
3
#include <string>
4
#include <functional>
5
6
using namespace folly;
7
8
// 异步任务,接受回调函数
9
void asyncTaskWithCallback(CPUThreadPoolExecutor& executor, const std::string& taskName, std::function<void(const std::string&)> callback) {
10
executor.add([taskName, callback]() {
11
// 模拟耗时操作
12
std::this_thread::sleep_for(std::chrono::milliseconds(250));
13
std::string result = "Task " + taskName + " completed";
14
callback(result); // 执行回调函数,传递结果
15
});
16
}
17
18
int main() {
19
CPUThreadPoolExecutor executor(2);
20
executor.start();
21
22
// 提交异步任务,并设置回调函数
23
asyncTaskWithCallback(executor, "Task-A", [](const std::string& result) {
24
std::cout << "Callback received result for Task-A: " << result << std::endl;
25
});
26
27
asyncTaskWithCallback(executor, "Task-B", [](const std::string& result) {
28
std::cout << "Callback received result for Task-B: " << result << std::endl;
29
});
30
31
// 主线程可以继续执行其他操作
32
std::cout << "Async tasks submitted with callbacks, main thread continues..." << std::endl;
33
34
// 为了演示效果,主线程等待一段时间,确保回调函数有机会执行
35
std::this_thread::sleep_for(std::chrono::seconds(1));
36
37
executor.stop();
38
return 0;
39
}
在这个例子中,asyncTaskWithCallback
函数接受一个回调函数 callback
作为参数。异步任务执行完成后,会调用回调函数,并将任务结果作为参数传递给回调函数。这种方式通过回调函数机制处理异步任务的结果,也是一种常见的异步编程模式。
总结
CPUThreadPoolExecutor
是实现异步编程的关键组件。结合 folly::Future/Promise
或回调函数,可以构建灵活高效的异步编程模型,提高程序的响应性和资源利用率。异步编程在高并发、I/O密集型应用中尤其重要,可以显著提升系统性能和用户体验。
4.4 性能优化与调优
CPUThreadPoolExecutor
提供了强大的并发处理能力,但要充分发挥其性能,需要进行合理的配置和调优。性能优化与调优主要集中在以下几个方面:
4.4.1 线程池大小选择策略
线程池的大小直接影响系统的并发处理能力和资源利用率。线程池过小,无法充分利用系统资源,导致任务排队等待,降低吞吐量;线程池过大,会增加线程上下文切换的开销,甚至导致资源耗尽。因此,选择合适的线程池大小至关重要。
影响线程池大小的因素
① CPU 核心数(Number of CPU Cores):CPU 核心数是决定线程池大小的重要因素。通常情况下,CPU 密集型任务的线程池大小可以设置为 CPU 核心数或略多于核心数。对于 I/O 密集型任务,由于线程在等待 I/O 操作时不会占用 CPU,可以将线程池大小设置得更大,例如 CPU 核心数的 2 倍甚至更多。
② 任务类型(Task Type):任务类型决定了线程池的最佳大小。
▮▮▮▮⚝ CPU 密集型任务(CPU-bound Tasks):这类任务主要消耗 CPU 资源进行计算,例如图像处理、视频编码、复杂算法等。对于 CPU 密集型任务,线程池大小不宜设置过大,通常设置为 CPU 核心数 + 1
即可,甚至可以等于 CPU 核心数。过多的线程反而会增加上下文切换开销,降低性能。
▮▮▮▮⚝ I/O 密集型任务(I/O-bound Tasks):这类任务主要等待 I/O 操作完成,例如网络请求、数据库查询、文件读写等。线程在等待 I/O 时,CPU 处于空闲状态。为了充分利用 CPU 资源,可以适当增加线程池大小。经验公式:线程池大小 = CPU 核心数 * (1 + I/O 耗时 / CPU 耗时)
。例如,如果 I/O 耗时是 CPU 耗时的 2 倍,则线程池大小可以设置为 CPU 核心数 * 3
。
③ 系统负载(System Load):系统负载也会影响线程池大小的选择。在高负载情况下,可以适当减小线程池大小,避免资源竞争过于激烈。在低负载情况下,可以适当增加线程池大小,提高系统吞吐量。
④ 内存限制(Memory Limits):线程本身也会消耗内存资源。如果系统内存有限,需要考虑线程池大小对内存的消耗,避免内存溢出。
动态调整线程池大小
在实际应用中,任务类型和系统负载可能会动态变化。为了更好地适应这些变化,可以考虑动态调整线程池大小。CPUThreadPoolExecutor
本身不直接提供动态调整大小的 API,但可以通过一些技巧实现动态调整,例如:
⚝ 监控系统指标:监控 CPU 使用率、任务队列长度、线程池活跃线程数等指标。
⚝ 根据指标调整:当 CPU 使用率过低或任务队列过长时,可以适当增加线程池大小;当 CPU 使用率过高或线程上下文切换频繁时,可以适当减小线程池大小。
⚝ 使用定时任务:定期检查系统指标,并根据指标调整线程池大小。
经验公式与建议
⚝ CPU 密集型任务:线程池大小 ≈ CPU 核心数
或 线程池大小 ≈ CPU 核心数 + 1
⚝ I/O 密集型任务:线程池大小 ≈ CPU 核心数 * (1 + I/O 耗时 / CPU 耗时)
,或者根据经验值设置为 CPU 核心数 * 2
或更大。
⚝ 初始值设置:可以先根据经验公式设置一个初始值,然后在实际运行中根据监控数据进行微调。
⚝ 压力测试:通过压力测试找到最佳线程池大小。逐渐增加并发请求量,观察系统吞吐量和响应时间,找到吞吐量最大且响应时间稳定的线程池大小。
4.4.2 任务调度优化
任务调度策略也会影响线程池的性能。CPUThreadPoolExecutor
提供了多种任务队列类型,不同的队列类型具有不同的调度特性。选择合适的任务队列类型,可以优化任务调度,提高线程池的性能。
常见的任务队列类型
① LifoSemMPMCQueue
(默认队列):后进先出(LIFO)的无锁多生产者多消费者(MPMC)队列。LIFO 队列的优点是可以提高 CPU 缓存命中率,因为后提交的任务很可能与当前正在执行的任务相关,放在缓存中的数据更有可能被复用。适用于任务之间存在局部性,或者希望尽快处理最新提交的任务的场景。
② MPMCQueue
: 先进先出(FIFO)的无锁 MPMC 队列。FIFO 队列保证任务按照提交顺序执行,适用于需要保证任务执行顺序的场景,例如消息队列、事件处理等。
③ PriorityMPMCQueue
: 优先级 MPMC 队列。可以为任务设置优先级,优先级高的任务优先执行。适用于需要优先处理重要任务的场景,例如服务质量(QoS)保障、紧急任务处理等。
任务调度策略选择建议
① 默认队列 LifoSemMPMCQueue
: 对于大多数 CPU 密集型任务和对任务执行顺序没有严格要求的场景,默认队列 LifoSemMPMCQueue
通常是一个不错的选择,可以提高 CPU 缓存命中率,提升性能。
② MPMCQueue
: 如果需要严格保证任务的执行顺序,例如按照提交顺序处理请求,或者实现公平的任务调度,可以选择 MPMCQueue
。
③ PriorityMPMCQueue
: 如果需要优先处理某些重要任务,例如需要保证高优先级任务的低延迟,可以使用 PriorityMPMCQueue
,并为任务设置合适的优先级。
自定义任务队列
CPUThreadPoolExecutor
允许自定义任务队列。如果内置的任务队列类型不能满足需求,可以根据具体应用场景,实现自定义的任务队列。例如,可以实现一个基于时间窗口的任务队列,或者一个具有更复杂调度策略的队列。
任务提交策略
任务提交策略也会影响任务调度效率。例如,批量提交任务可以减少任务提交的开销。在某些场景下,可以考虑将多个小任务合并成一个大任务提交,减少任务调度的次数。
总结
性能优化与调优是充分发挥 CPUThreadPoolExecutor
性能的关键。合理的线程池大小选择策略和任务调度优化策略,可以显著提高系统的并发处理能力和资源利用率。在实际应用中,需要根据任务类型、系统负载和性能需求,选择合适的线程池大小和任务队列类型,并进行持续的监控和调优。
4.5 案例分析:使用 CPUThreadPoolExecutor
构建高性能服务
本节通过一个案例分析,展示如何使用 CPUThreadPoolExecutor
构建高性能服务。我们将以构建一个简单的在线图片处理服务为例,说明 CPUThreadPoolExecutor
在实际应用中的价值和使用方法。
服务需求
构建一个在线图片处理服务,用户可以上传图片,并选择不同的处理操作,例如:
- 图片缩放(Image Scaling):将图片缩放到指定尺寸。
- 图片裁剪(Image Cropping):裁剪图片指定区域。
- 图片滤镜(Image Filtering):应用各种滤镜效果。
- 格式转换(Format Conversion):将图片转换为不同的格式(例如 JPEG, PNG, WebP)。
服务需要支持高并发请求,保证低延迟和高吞吐量。
服务架构设计
服务架构可以设计为典型的客户端-服务器(Client-Server)模式。
- 客户端(Client):可以是 Web 浏览器、移动应用或命令行工具,负责用户交互、上传图片和接收处理结果。
- 服务器(Server):负责接收客户端请求、调度图片处理任务、执行图片处理操作、返回处理结果。
服务器端的核心组件包括:
⚝ 请求接收模块(Request Handling Module):负责接收客户端请求,解析请求参数,并将请求分发到任务处理模块。可以使用高性能的 Web 服务器框架,例如 nghttp2
或 proxygen
(基于 folly
库)。
⚝ 任务队列(Task Queue):用于存储待处理的图片处理任务。可以使用 CPUThreadPoolExecutor
的任务队列,或者自定义队列。
⚝ 任务处理模块(Task Processing Module):负责从任务队列中获取任务,调用图片处理库执行具体的图片处理操作。使用 CPUThreadPoolExecutor
管理线程池,并发执行图片处理任务。
⚝ 图片处理库(Image Processing Library):提供各种图片处理算法和功能。可以使用开源的图片处理库,例如 libvips
或 ImageMagick
。
⚝ 结果返回模块(Response Handling Module):负责将图片处理结果返回给客户端。
使用 CPUThreadPoolExecutor
实现任务处理模块
任务处理模块是服务的核心,使用 CPUThreadPoolExecutor
可以有效地实现并发图片处理。
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <folly/futures/Future.h>
3
#include <folly/futures/Promise.h>
4
#include <iostream>
5
#include <string>
6
#include <vector>
7
8
// 假设的图片处理函数 (使用占位符)
9
std::string processImage(const std::string& imagePath, const std::string& operation) {
10
// 模拟图片处理耗时
11
std::this_thread::sleep_for(std::chrono::milliseconds(50));
12
return "Processed: " + imagePath + ", Operation: " + operation;
13
}
14
15
class ImageProcessingService {
16
public:
17
ImageProcessingService(int threadPoolSize) : executor_(threadPoolSize) {
18
executor_.start();
19
}
20
21
~ImageProcessingService() {
22
executor_.stop();
23
}
24
25
Future<std::string> submitTask(const std::string& imagePath, const std::string& operation) {
26
Promise<std::string> promise;
27
auto future = promise.getFuture();
28
executor_.add([promise = std::move(promise), imagePath, operation]() mutable {
29
std::string result = processImage(imagePath, operation);
30
promise.setValue(result);
31
});
32
return future;
33
}
34
35
private:
36
folly::CPUThreadPoolExecutor executor_;
37
};
38
39
int main() {
40
ImageProcessingService service(8); // 创建线程池大小为 8 的图片处理服务
41
42
std::vector<Future<std::string>> futures;
43
std::vector<std::pair<std::string, std::string>> tasks = {
44
{"image1.jpg", "scale"},
45
{"image2.png", "crop"},
46
{"image3.bmp", "filter"},
47
{"image4.gif", "convert"},
48
{"image5.jpeg", "scale"},
49
{"image6.webp", "crop"},
50
{"image7.tiff", "filter"},
51
{"image8.raw", "convert"},
52
{"image9.jpg", "scale"},
53
{"image10.png", "crop"}
54
};
55
56
for (const auto& task : tasks) {
57
futures.push_back(service.submitTask(task.first, task.second));
58
}
59
60
for (auto& future : futures) {
61
std::cout << future.get() << std::endl;
62
}
63
64
return 0;
65
}
在这个案例中,ImageProcessingService
类封装了 CPUThreadPoolExecutor
,用于管理图片处理任务的线程池。submitTask
方法将图片处理任务提交到线程池异步执行,并返回 Future<std::string>
代表处理结果。主函数模拟提交多个图片处理任务,并等待所有任务完成。
性能优化与扩展
⚝ 线程池大小调优:根据实际的图片处理任务类型和系统负载,调整 CPUThreadPoolExecutor
的线程池大小。对于 CPU 密集型的图片处理算法,线程池大小可以设置为 CPU 核心数或略多于核心数;对于 I/O 密集型的图片处理(例如网络图片下载、大文件读写),可以适当增加线程池大小。
⚝ 任务队列选择:根据任务特性选择合适的任务队列类型。默认的 LifoSemMPMCQueue
适用于大多数场景。如果需要保证任务处理顺序,可以使用 MPMCQueue
。
⚝ 图片处理库优化:选择高性能的图片处理库,例如 libvips
,并进行相应的性能优化配置。
⚝ 服务扩展:可以使用负载均衡技术,将请求分发到多台服务器,提高服务的整体处理能力和可用性。可以使用分布式任务队列,将任务分发到不同的任务处理节点。
总结
通过这个案例分析,我们展示了如何使用 CPUThreadPoolExecutor
构建高性能的在线图片处理服务。CPUThreadPoolExecutor
提供了强大的并发处理能力,可以有效地提高服务的吞吐量和响应速度。合理的架构设计、线程池配置和性能优化,可以构建出满足高并发、低延迟需求的高性能服务。
END_OF_CHAPTER
5. chapter 5: folly::CPUThreadPoolExecutor
API 全面解析
5.1 类结构与继承关系(Class Structure and Inheritance Relationship)
folly::CPUThreadPoolExecutor
是 folly
库中用于管理和执行计算密集型任务的核心组件。理解其类结构与继承关系,有助于我们从宏观层面把握其设计理念和功能特性。CPUThreadPoolExecutor
并非孤立存在,它构建于 folly
库提供的其他基础组件之上,并遵循一定的设计模式。
CPUThreadPoolExecutor
的类结构相对独立,它并没有复杂的继承体系。这体现了其设计上的简洁性和专注性,旨在提供一个高效、易用的 CPU 密集型任务线程池实现。
① 核心类:folly::CPUThreadPoolExecutor
CPUThreadPoolExecutor
是本章的主角,也是我们关注的重点。它是一个独立的类,主要负责线程池的生命周期管理、任务调度、线程管理等核心功能。
② 关联组件与依赖
虽然 CPUThreadPoolExecutor
没有明显的继承关系,但它与 folly
库中的其他组件紧密协作,以实现其强大的功能。主要的关联组件包括:
⚝ folly::Executor
接口:CPUThreadPoolExecutor
隐式地实现了 folly::Executor
接口。Executor
是 folly
库中定义的一个抽象接口,用于表示任务执行器。它定义了任务提交和执行的基本方法,例如 add
和 post
。虽然 CPUThreadPoolExecutor
没有显式继承 Executor
,但它提供了 Executor
接口所要求的功能,因此可以被视为一种 Executor
的实现。
⚝ folly::BlockingQueue
:CPUThreadPoolExecutor
内部使用 folly::BlockingQueue
作为任务队列,用于存储待执行的任务。BlockingQueue
提供了线程安全、高效的队列操作,支持阻塞式的入队和出队,是实现线程池的关键数据结构。folly
库提供了多种 BlockingQueue
的实现,例如 LinkedBlockingQueue
和 ArrayBlockingQueue
,CPUThreadPoolExecutor
可以根据不同的需求选择合适的队列类型。
⚝ std::thread
:CPUThreadPoolExecutor
基于标准库的 std::thread
来创建和管理工作线程。它利用 std::thread
提供的线程创建、启动、停止和同步等功能,实现了线程池中工作线程的生命周期管理。
⚝ folly::ThreadFactory
:CPUThreadPoolExecutor
可以配置 folly::ThreadFactory
来定制工作线程的创建过程。ThreadFactory
允许用户自定义线程的名称、优先级、栈大小等属性,从而更好地满足特定应用场景的需求。
③ 设计模式
CPUThreadPoolExecutor
的设计体现了线程池设计模式的核心思想。线程池模式旨在通过维护一个线程集合来重用线程,从而减少线程创建和销毁的开销,提高并发任务的处理效率。CPUThreadPoolExecutor
实现了线程池模式的以下关键要素:
⚝ 线程池管理:负责线程的创建、启动、停止和回收。
⚝ 任务队列:用于存储待执行的任务。
⚝ 任务调度:负责从任务队列中取出任务,并分配给工作线程执行。
⚝ 线程复用:工作线程执行完任务后不会立即销毁,而是返回线程池等待执行新的任务。
总结
folly::CPUThreadPoolExecutor
在类结构上保持了简洁和独立,没有复杂的继承关系。它主要依赖于 folly
库提供的 Executor
接口、BlockingQueue
和 ThreadFactory
等组件,以及标准库的 std::thread
来实现线程池的核心功能。其设计体现了线程池设计模式的思想,旨在提供一个高效、易用、可配置的 CPU 密集型任务线程池解决方案。理解其类结构和关联组件,有助于我们深入理解其内部工作原理和使用方法。
5.2 公有成员函数(Public Member Functions)详解
folly::CPUThreadPoolExecutor
提供了丰富的公有成员函数,用于创建、配置、启动、停止和管理线程池,以及提交和执行任务。本节将对 CPUThreadPoolExecutor
的主要公有成员函数进行详细解析,帮助读者全面掌握其 API 使用方法。
5.2.1 构造函数与析构函数(Constructor and Destructor)
CPUThreadPoolExecutor
提供了多个构造函数,允许用户以不同的方式创建和初始化线程池。
① 默认构造函数
1
explicit CPUThreadPoolExecutor(size_t numThreads);
⚝ 功能:创建一个具有指定数量工作线程的 CPUThreadPoolExecutor
实例。
⚝ 参数:
▮▮▮▮⚝ numThreads
:size_t
类型,指定线程池中工作线程的数量。这是线程池的核心参数,直接影响线程池的并发处理能力。
⚝ 特点:
▮▮▮▮⚝ 使用默认的任务队列(LinkedBlockingQueue
)和默认的线程工厂(DefaultThreadFactory
)。
▮▮▮▮⚝ 线程池在创建后不会立即启动,需要显式调用 start()
方法启动。
⚝ 适用场景:
▮▮▮▮⚝ 适用于大多数基本场景,用户只需要指定线程池的大小即可。
示例:
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <iostream>
3
4
int main() {
5
// 创建一个包含 4 个工作线程的线程池
6
folly::CPUThreadPoolExecutor pool(4);
7
std::cout << "CPUThreadPoolExecutor created with 4 threads." << std::endl;
8
return 0;
9
}
② 带自定义线程工厂的构造函数
1
CPUThreadPoolExecutor(
2
size_t numThreads,
3
std::shared_ptr<ThreadFactory> threadFactory);
⚝ 功能:创建一个具有指定数量工作线程和自定义线程工厂的 CPUThreadPoolExecutor
实例。
⚝ 参数:
▮▮▮▮⚝ numThreads
:size_t
类型,指定线程池中工作线程的数量。
▮▮▮▮⚝ threadFactory
:std::shared_ptr<ThreadFactory>
类型,指向自定义线程工厂的智能指针。ThreadFactory
接口允许用户自定义线程的创建过程,例如设置线程名称、优先级等。
⚝ 特点:
▮▮▮▮⚝ 允许用户自定义线程的创建方式,更加灵活。
▮▮▮▮⚝ 线程池在创建后不会立即启动,需要显式调用 start()
方法启动。
⚝ 适用场景:
▮▮▮▮⚝ 当需要定制工作线程的属性时,例如设置线程名称以便于监控和调试,或者需要设置线程优先级以满足特定调度需求时。
示例:
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <folly/executors/ThreadFactory.h>
3
#include <iostream>
4
5
// 自定义线程工厂
6
class MyThreadFactory : public folly::ThreadFactory {
7
public:
8
std::thread* newThread(folly::Func<void()> func) override {
9
std::thread* thread = new std::thread(func);
10
// 设置线程名称 (平台相关,此处为示例)
11
// pthread_setname_np(thread->native_handle(), "MyWorkerThread");
12
std::cout << "Thread created by MyThreadFactory." << std::endl;
13
return thread;
14
}
15
};
16
17
int main() {
18
// 创建自定义线程工厂
19
auto threadFactory = std::make_shared<MyThreadFactory>();
20
// 创建一个包含 4 个工作线程,并使用自定义线程工厂的线程池
21
folly::CPUThreadPoolExecutor pool(4, threadFactory);
22
std::cout << "CPUThreadPoolExecutor created with custom ThreadFactory." << std::endl;
23
return 0;
24
}
③ 带自定义任务队列的构造函数
1
CPUThreadPoolExecutor(
2
size_t numThreads,
3
std::shared_ptr<BlockingQueue<Func>> taskQueue);
⚝ 功能:创建一个具有指定数量工作线程和自定义任务队列的 CPUThreadPoolExecutor
实例。
⚝ 参数:
▮▮▮▮⚝ numThreads
:size_t
类型,指定线程池中工作线程的数量。
▮▮▮▮⚝ taskQueue
:std::shared_ptr<BlockingQueue<Func>>
类型,指向自定义任务队列的智能指针。BlockingQueue
接口定义了线程安全、阻塞式的队列操作,folly
库提供了多种 BlockingQueue
的实现,用户也可以自定义实现。
⚝ 特点:
▮▮▮▮⚝ 允许用户自定义任务队列的类型和特性,例如选择有界队列或无界队列,或者使用优先级队列。
▮▮▮▮⚝ 线程池在创建后不会立即启动,需要显式调用 start()
方法启动。
⚝ 适用场景:
▮▮▮▮⚝ 当需要使用特定类型的任务队列时,例如需要限制任务队列的大小以防止内存溢出,或者需要实现任务优先级调度时。
示例:
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <folly/synchronization/LifoBlockingQueue.h> // LifoBlockingQueue 示例
3
#include <iostream>
4
5
int main() {
6
// 创建 LIFO (后进先出) 任务队列
7
auto taskQueue = std::make_shared<folly::LifoBlockingQueue<folly::Func<void()>>>();
8
// 创建一个包含 4 个工作线程,并使用 LIFO 任务队列的线程池
9
folly::CPUThreadPoolExecutor pool(4, taskQueue);
10
std::cout << "CPUThreadPoolExecutor created with custom LifoBlockingQueue." << std::endl;
11
return 0;
12
}
④ 带所有自定义组件的构造函数
1
CPUThreadPoolExecutor(
2
size_t numThreads,
3
std::shared_ptr<ThreadFactory> threadFactory,
4
std::shared_ptr<BlockingQueue<Func>> taskQueue);
⚝ 功能:创建一个具有指定数量工作线程、自定义线程工厂和自定义任务队列的 CPUThreadPoolExecutor
实例。
⚝ 参数:
▮▮▮▮⚝ numThreads
:size_t
类型,指定线程池中工作线程的数量。
▮▮▮▮⚝ threadFactory
:std::shared_ptr<ThreadFactory>
类型,指向自定义线程工厂的智能指针。
▮▮▮▮⚝ taskQueue
:std::shared_ptr<BlockingQueue<Func>>
类型,指向自定义任务队列的智能指针。
⚝ 特点:
▮▮▮▮⚝ 提供最大的灵活性,允许用户完全自定义线程池的各个核心组件。
▮▮▮▮⚝ 线程池在创建后不会立即启动,需要显式调用 start()
方法启动。
⚝ 适用场景:
▮▮▮▮⚝ 适用于需要高度定制化线程池的复杂场景,例如需要同时定制线程属性和任务队列类型,以满足特定的性能或功能需求。
示例:
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <folly/executors/ThreadFactory.h>
3
#include <folly/synchronization/LifoBlockingQueue.h>
4
#include <iostream>
5
6
// 自定义线程工厂 (同上例)
7
class MyThreadFactory : public folly::ThreadFactory { /* ... */ };
8
9
int main() {
10
// 创建自定义线程工厂
11
auto threadFactory = std::make_shared<MyThreadFactory>();
12
// 创建 LIFO 任务队列 (同上例)
13
auto taskQueue = std::make_shared<folly::LifoBlockingQueue<folly::Func<void()>>>();
14
// 创建一个包含 4 个工作线程,使用自定义线程工厂和 LIFO 任务队列的线程池
15
folly::CPUThreadPoolExecutor pool(4, threadFactory, taskQueue);
16
std::cout << "CPUThreadPoolExecutor created with custom ThreadFactory and LifoBlockingQueue." << std::endl;
17
return 0;
18
}
⑤ 析构函数 ~CPUThreadPoolExecutor()
1
~CPUThreadPoolExecutor();
⚝ 功能:销毁 CPUThreadPoolExecutor
实例,释放线程池占用的资源。
⚝ 特点:
▮▮▮▮⚝ 析构函数会自动调用 stop()
方法停止线程池,并等待所有正在执行的任务完成。
▮▮▮▮⚝ 确保线程池在销毁前处于安全停止状态,避免资源泄漏或程序崩溃。
⚝ 注意事项:
▮▮▮▮⚝ 在程序结束前,应确保 CPUThreadPoolExecutor
实例被正确销毁,以释放线程资源。通常情况下,当 CPUThreadPoolExecutor
对象超出作用域时,析构函数会自动被调用。
5.2.2 任务管理相关函数(Task Management Related Functions)
CPUThreadPoolExecutor
提供了一系列函数用于提交任务到线程池并管理任务的执行。
① add()
方法:提交任务
1
void add(Func func);
⚝ 功能:向线程池提交一个任务。
⚝ 参数:
▮▮▮▮⚝ func
:folly::Func<void()>
类型,表示要执行的任务,通常是一个无参数、无返回值的函数对象(例如 lambda 表达式、std::function)。
⚝ 返回值:void
,任务提交是异步的,add()
方法立即返回,不等待任务执行完成。
⚝ 特点:
▮▮▮▮⚝ 任务会被添加到线程池的任务队列中,等待工作线程执行。
▮▮▮▮⚝ 如果线程池已满或正在停止,任务提交可能会受到影响,具体行为取决于任务队列的类型和线程池的状态。
⚝ 适用场景:
▮▮▮▮⚝ 适用于提交不需要立即获取结果的异步任务。
示例:
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <iostream>
3
#include <chrono>
4
#include <thread>
5
6
void myTask(int taskID) {
7
std::cout << "Task " << taskID << " started by thread " << std::this_thread::get_id() << std::endl;
8
std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟耗时任务
9
std::cout << "Task " << taskID << " finished by thread " << std::this_thread::get_id() << std::endl;
10
}
11
12
int main() {
13
folly::CPUThreadPoolExecutor pool(2);
14
pool.start(); // 启动线程池
15
16
for (int i = 1; i <= 5; ++i) {
17
pool.add([i] { myTask(i); }); // 提交 5 个任务
18
std::cout << "Task " << i << " submitted." << std::endl;
19
}
20
21
std::this_thread::sleep_for(std::chrono::seconds(5)); // 等待任务执行完成 (实际应用中应使用更可靠的同步机制)
22
pool.stop(); // 停止线程池
23
return 0;
24
}
② post()
方法:提交任务 (别名)
1
void post(Func func);
⚝ 功能:与 add()
方法完全相同,是 add()
方法的别名。
⚝ 参数 和 返回值:与 add()
方法一致。
⚝ 特点 和 适用场景:与 add()
方法一致。
⚝ 使用建议:add()
和 post()
可以互换使用,选择哪个方法取决于个人偏好或代码风格。在 folly
库中,post()
方法更常见于 Executor
接口的实现中,因此使用 post()
可能更符合 folly
的编程习惯。
示例:
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <iostream>
3
#include <chrono>
4
#include <thread>
5
6
void myTask(int taskID) { /* ... */ } // 任务函数 (同上例)
7
8
int main() {
9
folly::CPUThreadPoolExecutor pool(2);
10
pool.start(); // 启动线程池
11
12
for (int i = 1; i <= 5; ++i) {
13
pool.post([i] { myTask(i); }); // 使用 post() 提交任务
14
std::cout << "Task " << i << " submitted using post()." << std::endl;
15
}
16
17
std::this_thread::sleep_for(std::chrono::seconds(5));
18
pool.stop();
19
return 0;
20
}
③ getStats()
方法:获取线程池统计信息
1
ExecutorStats getStats() const;
⚝ 功能:获取线程池的统计信息,例如已完成的任务数、正在等待的任务数等。
⚝ 参数:无。
⚝ 返回值:ExecutorStats
结构体,包含线程池的统计数据。ExecutorStats
结构体的具体成员可能包含:
▮▮▮▮⚝ tasksQueued
:已添加到任务队列的任务总数。
▮▮▮▮⚝ tasksRunning
:当前正在执行的任务数。
▮▮▮▮⚝ tasksFinished
:已完成的任务数。
▮▮▮▮⚝ workersIdle
:空闲工作线程数。
▮▮▮▮⚝ workersActive
:正在执行任务的工作线程数。
▮▮▮▮⚝ workersTotal
:线程池中工作线程的总数。
⚝ 特点:
▮▮▮▮⚝ 返回的是线程池的快照信息,反映了调用 getStats()
方法时的线程池状态。
▮▮▮▮⚝ 可以用于监控线程池的运行状态和性能指标。
⚝ 适用场景:
▮▮▮▮⚝ 用于性能监控、资源利用率分析、线程池调优等场景。
示例:
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <iostream>
3
#include <chrono>
4
#include <thread>
5
6
void myTask(int taskID) { /* ... */ } // 任务函数 (同上例)
7
8
int main() {
9
folly::CPUThreadPoolExecutor pool(2);
10
pool.start();
11
12
for (int i = 1; i <= 3; ++i) {
13
pool.add([i] { myTask(i); });
14
}
15
16
std::this_thread::sleep_for(std::chrono::seconds(1)); // 等待部分任务执行
17
18
folly::ExecutorStats stats = pool.getStats();
19
std::cout << "Thread Pool Stats:" << std::endl;
20
std::cout << " Tasks Queued: " << stats.tasksQueued << std::endl;
21
std::cout << " Tasks Running: " << stats.tasksRunning << std::endl;
22
std::cout << " Tasks Finished: " << stats.tasksFinished << std::endl;
23
std::cout << " Workers Idle: " << stats.workersIdle << std::endl;
24
std::cout << " Workers Active: " << stats.workersActive << std::endl;
25
std::cout << " Workers Total: " << stats.workersTotal << std::endl;
26
27
pool.stop();
28
return 0;
29
}
5.2.3 线程池控制函数(Thread Pool Control Functions)
CPUThreadPoolExecutor
提供了一组函数用于控制线程池的生命周期,包括启动、停止、调整线程数量等。
① start()
方法:启动线程池
1
void start();
⚝ 功能:启动线程池,开始接受和执行任务。
⚝ 参数:无。
⚝ 返回值:void
。
⚝ 特点:
▮▮▮▮⚝ 线程池在创建后默认处于停止状态,需要显式调用 start()
方法才能启动。
▮▮▮▮⚝ 启动线程池会创建并启动工作线程,使其开始从任务队列中获取任务并执行。
▮▮▮▮⚝ 可以多次调用 start()
方法,但只有第一次调用会生效,后续调用会被忽略。
⚝ 注意事项:
▮▮▮▮⚝ 在向线程池提交任务之前,必须先调用 start()
方法启动线程池,否则任务将无法被执行。
示例:
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <iostream>
3
4
int main() {
5
folly::CPUThreadPoolExecutor pool(2);
6
std::cout << "Thread pool created, but not started yet." << std::endl;
7
8
// pool.add([]{ /* ... */ }); // 在 start() 之前提交任务不会被执行
9
10
pool.start(); // 启动线程池
11
std::cout << "Thread pool started." << std::endl;
12
13
pool.add([]{ std::cout << "Task executed after start()." << std::endl; }); // 提交任务,会被执行
14
15
pool.stop();
16
return 0;
17
}
② stop()
方法:停止线程池
1
void stop();
⚝ 功能:停止线程池,使其不再接受新的任务,并等待正在执行的任务完成。
⚝ 参数:无。
⚝ 返回值:void
。
⚝ 特点:
▮▮▮▮⚝ 调用 stop()
方法后,线程池会拒绝接受新的任务提交。
▮▮▮▮⚝ 线程池会等待所有正在执行的任务完成,然后停止所有工作线程。
▮▮▮▮⚝ 这是一个阻塞操作,调用线程会等待直到线程池完全停止。
▮▮▮▮⚝ 可以多次调用 stop()
方法,但只有第一次调用会生效,后续调用会被忽略。
⚝ 注意事项:
▮▮▮▮⚝ 在程序退出前,应该调用 stop()
方法停止线程池,以确保所有任务都已完成,并释放线程资源。
▮▮▮▮⚝ 如果需要在停止线程池后立即销毁 CPUThreadPoolExecutor
对象,析构函数会自动调用 stop()
方法。
示例:
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <iostream>
3
#include <chrono>
4
#include <thread>
5
6
void myTask(int taskID) { /* ... */ } // 任务函数 (同前例)
7
8
int main() {
9
folly::CPUThreadPoolExecutor pool(2);
10
pool.start();
11
12
for (int i = 1; i <= 3; ++i) {
13
pool.add([i] { myTask(i); });
14
}
15
16
std::cout << "Stopping thread pool..." << std::endl;
17
pool.stop(); // 停止线程池,等待任务完成
18
std::cout << "Thread pool stopped." << std::endl;
19
20
// pool.add([]{ /* ... */ }); // 在 stop() 之后提交任务会被拒绝
21
22
return 0;
23
}
③ join()
方法:等待线程池停止
1
void join();
⚝ 功能:等待线程池完全停止。
⚝ 参数:无。
⚝ 返回值:void
。
⚝ 特点:
▮▮▮▮⚝ join()
方法通常在 stop()
方法之后调用,用于确保线程池已经完全停止,所有工作线程都已退出。
▮▮▮▮⚝ 这是一个阻塞操作,调用线程会等待直到线程池完全停止。
▮▮▮▮⚝ 如果线程池已经停止,join()
方法会立即返回。
⚝ 适用场景:
▮▮▮▮⚝ 当需要确保线程池在程序退出前完全停止时,例如在主线程等待所有后台线程池任务完成后再退出程序。
示例:
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <iostream>
3
#include <chrono>
4
#include <thread>
5
6
void myTask(int taskID) { /* ... */ } // 任务函数 (同前例)
7
8
int main() {
9
folly::CPUThreadPoolExecutor pool(2);
10
pool.start();
11
12
for (int i = 1; i <= 3; ++i) {
13
pool.add([i] { myTask(i); });
14
}
15
16
std::cout << "Stopping thread pool..." << std::endl;
17
pool.stop(); // 停止线程池
18
std::cout << "Waiting for thread pool to join..." << std::endl;
19
pool.join(); // 等待线程池完全停止
20
std::cout << "Thread pool joined." << std::endl;
21
22
return 0;
23
}
④ setNumThreads()
方法:动态调整线程池大小
1
void setNumThreads(size_t numThreads);
⚝ 功能:动态调整线程池的工作线程数量。
⚝ 参数:
▮▮▮▮⚝ numThreads
:size_t
类型,指定新的线程池工作线程数量。
⚝ 返回值:void
。
⚝ 特点:
▮▮▮▮⚝ 可以在线程池运行期间动态调整线程数量,以适应不同的负载需求。
▮▮▮▮⚝ 如果新的线程数量大于当前数量,线程池会创建新的工作线程。
▮▮▮▮⚝ 如果新的线程数量小于当前数量,线程池会尝试停止多余的工作线程(可能需要等待正在执行的任务完成)。
▮▮▮▮⚝ 调整线程数量是一个相对耗时的操作,应避免频繁调整。
⚝ 适用场景:
▮▮▮▮⚝ 适用于负载波动较大的场景,可以根据实时负载动态调整线程池大小,提高资源利用率和响应速度。
示例:
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <iostream>
3
#include <chrono>
4
#include <thread>
5
6
void myTask(int taskID) { /* ... */ } // 任务函数 (同前例)
7
8
int main() {
9
folly::CPUThreadPoolExecutor pool(2); // 初始 2 个线程
10
pool.start();
11
12
for (int i = 1; i <= 3; ++i) {
13
pool.add([i] { myTask(i); });
14
}
15
16
std::cout << "Current thread count: " << pool.getStats().workersTotal << std::endl;
17
18
std::this_thread::sleep_for(std::chrono::seconds(2)); // 等待一段时间
19
20
std::cout << "Setting thread count to 4..." << std::endl;
21
pool.setNumThreads(4); // 动态调整为 4 个线程
22
std::cout << "New thread count: " << pool.getStats().workersTotal << std::endl;
23
24
for (int i = 4; i <= 6; ++i) {
25
pool.add([i] { myTask(i); });
26
}
27
28
std::this_thread::sleep_for(std::chrono::seconds(3));
29
pool.stop();
30
return 0;
31
}
5.3 保护成员函数(Protected Member Functions)解析
folly::CPUThreadPoolExecutor
也提供了一些保护成员函数,这些函数主要用于子类扩展或定制线程池的行为。对于普通用户来说,通常不需要直接使用或关注这些保护成员函数。但对于希望深入理解 CPUThreadPoolExecutor
内部实现或进行高级定制的开发者,了解这些保护成员函数是有益的。
由于 CPUThreadPoolExecutor
没有公开的继承体系,用户通常不会直接继承 CPUThreadPoolExecutor
类。因此,保护成员函数对于外部用户来说,通常是不可见的和不可直接调用的。
常见的保护成员函数类型 (根据线程池的通用设计模式推测,具体 folly::CPUThreadPoolExecutor
的保护成员函数需要查阅源代码):
① 任务队列操作相关
⚝ getTask()
:从任务队列中获取一个待执行的任务。
⚝ putTask(Func func)
:将任务添加到任务队列中。
⚝ beforeExecute(std::thread& thread, Func func)
:在工作线程执行任务之前调用的钩子函数,可以用于执行一些任务执行前的准备工作,例如设置线程上下文。
⚝ afterExecute(Func func, std::exception_ptr exception)
:在工作线程执行任务之后调用的钩子函数,可以用于执行一些任务执行后的清理工作,例如记录任务执行日志、处理任务异常。
② 线程管理相关
⚝ onThreadStarted(std::thread& thread)
:当工作线程启动时调用的钩子函数。
⚝ onThreadStopped(std::thread& thread)
:当工作线程停止时调用的钩子函数。
⚝ adjustThreadCount()
:用于调整线程池工作线程数量的内部方法,setNumThreads()
公有方法可能会调用此保护方法。
③ 状态管理相关
⚝ isRunning()
:检查线程池是否正在运行。
⚝ isStopping()
:检查线程池是否正在停止。
⚝ isStopped()
:检查线程池是否已停止。
注意事项:
⚝ 保护成员函数是内部实现细节,可能会在 folly
库的后续版本中发生变化,不建议直接依赖这些保护成员函数进行开发。
⚝ 如果需要定制 CPUThreadPoolExecutor
的行为,更推荐使用构造函数参数提供的配置选项,例如自定义 ThreadFactory
和 BlockingQueue
,或者通过组合其他 folly
库的组件来实现更高级的功能。
⚝ 查阅 folly::CPUThreadPoolExecutor
的官方文档或源代码是了解其具体保护成员函数的最可靠方式。
总结
folly::CPUThreadPoolExecutor
的保护成员函数主要用于内部实现和子类扩展,对于普通用户来说通常是不可见的和不需要直接使用的。了解这些保护成员函数有助于深入理解线程池的内部工作原理,但进行定制化开发时应谨慎使用,并优先考虑使用公开的配置选项和组件组合方式。
5.4 示例代码与 API 使用场景(Example Code and API Usage Scenarios)
本节将通过一系列示例代码,演示 folly::CPUThreadPoolExecutor
常用 API 的使用场景,帮助读者更好地理解和应用这些 API。
示例 1:基本任务提交与执行
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <iostream>
3
#include <chrono>
4
#include <thread>
5
6
void simpleTask(int taskID) {
7
std::cout << "Simple Task " << taskID << " started by thread " << std::this_thread::get_id() << std::endl;
8
std::this_thread::sleep_for(std::chrono::seconds(1));
9
std::cout << "Simple Task " << taskID << " finished by thread " << std::this_thread::get_id() << std::endl;
10
}
11
12
int main() {
13
folly::CPUThreadPoolExecutor pool(3); // 创建包含 3 个线程的线程池
14
pool.start(); // 启动线程池
15
16
for (int i = 1; i <= 6; ++i) {
17
pool.add([i] { simpleTask(i); }); // 提交 6 个简单任务
18
}
19
20
std::cout << "All simple tasks submitted." << std::endl;
21
std::this_thread::sleep_for(std::chrono::seconds(3)); // 等待任务执行完成
22
pool.stop(); // 停止线程池
23
std::cout << "Thread pool stopped." << std::endl;
24
return 0;
25
}
使用场景:
⚝ 这是最基本的 CPUThreadPoolExecutor
使用场景,演示了如何创建线程池、启动线程池、提交任务和停止线程池。
⚝ 适用于执行简单的、无返回值的异步任务。
示例 2:使用 getStats()
监控线程池状态
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <iostream>
3
#include <chrono>
4
#include <thread>
5
6
void taskWithStats(int taskID) {
7
std::cout << "Task with Stats " << taskID << " started." << std::endl;
8
std::this_thread::sleep_for(std::chrono::seconds(2));
9
std::cout << "Task with Stats " << taskID << " finished." << std::endl;
10
}
11
12
int main() {
13
folly::CPUThreadPoolExecutor pool(2);
14
pool.start();
15
16
for (int i = 1; i <= 4; ++i) {
17
pool.add([i] { taskWithStats(i); });
18
}
19
20
// 循环监控线程池状态
21
for (int i = 0; i < 5; ++i) {
22
folly::ExecutorStats stats = pool.getStats();
23
std::cout << "--- Thread Pool Stats (Iteration " << i + 1 << ") ---" << std::endl;
24
std::cout << " Tasks Queued: " << stats.tasksQueued << std::endl;
25
std::cout << " Tasks Running: " << stats.tasksRunning << std::endl;
26
std::cout << " Tasks Finished: " << stats.tasksFinished << std::endl;
27
std::cout << " Workers Active: " << stats.workersActive << std::endl;
28
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // 短暂等待
29
}
30
31
pool.stop();
32
return 0;
33
}
使用场景:
⚝ 演示了如何使用 getStats()
方法获取线程池的统计信息,并进行简单的监控。
⚝ 适用于需要实时了解线程池运行状态的场景,例如性能监控、负载分析等。
示例 3:动态调整线程池大小
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <iostream>
3
#include <chrono>
4
#include <thread>
5
6
void dynamicSizeTask(int taskID) {
7
std::cout << "Dynamic Size Task " << taskID << " started." << std::endl;
8
std::this_thread::sleep_for(std::chrono::seconds(1));
9
std::cout << "Dynamic Size Task " << taskID << " finished." << std::endl;
10
}
11
12
int main() {
13
folly::CPUThreadPoolExecutor pool(2); // 初始 2 个线程
14
pool.start();
15
16
for (int i = 1; i <= 4; ++i) {
17
pool.add([i] { dynamicSizeTask(i); });
18
}
19
20
std::cout << "Initial thread count: " << pool.getStats().workersTotal << std::endl;
21
std::this_thread::sleep_for(std::chrono::seconds(2));
22
23
pool.setNumThreads(4); // 动态调整为 4 个线程
24
std::cout << "Thread count adjusted to 4." << std::endl;
25
std::cout << "New thread count: " << pool.getStats().workersTotal << std::endl;
26
27
for (int i = 5; i <= 8; ++i) {
28
pool.add([i] { dynamicSizeTask(i); });
29
}
30
31
std::this_thread::sleep_for(std::chrono::seconds(3));
32
pool.stop();
33
return 0;
34
}
使用场景:
⚝ 演示了如何使用 setNumThreads()
方法动态调整线程池的大小。
⚝ 适用于负载波动较大的场景,可以根据负载变化动态调整线程池大小,提高资源利用率。
示例 4:自定义线程工厂
1
#include <folly/executors/CPUThreadPoolExecutor.h>
2
#include <folly/executors/ThreadFactory.h>
3
#include <iostream>
4
#include <thread>
5
6
class NamedThreadFactory : public folly::ThreadFactory {
7
public:
8
NamedThreadFactory(const std::string& namePrefix) : namePrefix_(namePrefix), threadCount_(0) {}
9
10
std::thread* newThread(folly::Func<void()> func) override {
11
std::string threadName = namePrefix_ + std::to_string(++threadCount_);
12
std::thread* thread = new std::thread([func, threadName]() {
13
// 设置线程名称 (平台相关,此处为示例)
14
// pthread_setname_np(pthread_self(), threadName.c_str());
15
std::cout << "Thread '" << threadName << "' started." << std::endl;
16
func();
17
std::cout << "Thread '" << threadName << "' finished." << std::endl;
18
});
19
return thread;
20
}
21
22
private:
23
std::string namePrefix_;
24
int threadCount_;
25
};
26
27
void namedThreadTask() {
28
std::this_thread::sleep_for(std::chrono::seconds(1));
29
}
30
31
int main() {
32
auto threadFactory = std::make_shared<NamedThreadFactory>("MyWorkerThread-");
33
folly::CPUThreadPoolExecutor pool(2, threadFactory); // 使用自定义线程工厂
34
pool.start();
35
36
for (int i = 0; i < 4; ++i) {
37
pool.add([] { namedThreadTask(); });
38
}
39
40
std::this_thread::sleep_for(std::chrono::seconds(3));
41
pool.stop();
42
return 0;
43
}
使用场景:
⚝ 演示了如何使用自定义 ThreadFactory
来创建具有特定名称的工作线程。
⚝ 适用于需要区分不同类型线程,或者在监控工具中需要根据线程名称进行过滤和分析的场景。
总结
本节通过多个示例代码,展示了 folly::CPUThreadPoolExecutor
常用 API 的使用方法和应用场景,包括任务提交、状态监控、动态调整线程池大小和自定义线程工厂等。这些示例代码可以帮助读者快速上手 CPUThreadPoolExecutor
,并将其应用到实际的并发编程场景中。在实际应用中,可以根据具体需求灵活组合和使用这些 API,构建高效、可靠的并发程序。
END_OF_CHAPTER
6. chapter 6: folly::CPUThreadPoolExecutor
与其他线程池的比较
6.1 与标准库线程池的对比(Comparison with Standard Library Thread Pool)
在现代 C++ 开发中,标准库(Standard Library)提供了 std::thread
以及相关的并发工具,例如 std::async
、std::future
和 std::packaged_task
等,但截至目前(C++20 标准),C++ 标准库并没有直接提供线程池的官方实现。虽然未来标准可能会引入线程池,但目前开发者通常需要依赖第三方库或自行实现线程池。因此,当我们将 folly::CPUThreadPoolExecutor
与“标准库线程池”进行比较时,实际上是在对比 folly::CPUThreadPoolExecutor
与开发者可能使用标准库组件(如 std::thread
, std::async
)手动构建或模拟的线程池,以及一些准标准库或被广泛认可的替代方案。
① 功能与定位:
⚝ 标准库组件 (std::thread
, std::async
等):
▮▮▮▮⚝ 定位:提供基础的线程创建、同步和异步执行能力,是构建并发程序的基石。std::async
可以看作一种简单的任务提交方式,但它更侧重于异步任务的启动和结果获取,而非线程池的管理和复用。
▮▮▮▮⚝ 功能:允许开发者直接创建和管理线程,提供了 future
和 promise
用于异步结果的获取和传递,以及 packaged_task
用于封装可调用对象以便异步执行。
▮▮▮▮⚝ 线程池特性:标准库本身不直接提供线程池。开发者需要利用标准库提供的线程和同步原语(如互斥锁、条件变量等)自行构建线程池。这需要较高的并发编程技巧和对线程池原理的深入理解。
⚝ folly::CPUThreadPoolExecutor
:
▮▮▮▮⚝ 定位:专为 CPU 密集型任务设计的高性能线程池实现,是 folly
库中重要的并发组件。
▮▮▮▮⚝ 功能:提供完整的线程池管理功能,包括线程的创建、复用、任务队列管理、线程池生命周期控制、任务优先级、监控指标等。它封装了线程管理的复杂性,让开发者可以更专注于任务的提交和执行。
▮▮▮▮⚝ 线程池特性:内置线程池管理机制,自动处理线程的创建和销毁,提供多种配置选项以适应不同的应用场景,例如动态调整线程池大小、自定义任务队列等。
② 易用性与灵活性:
⚝ 标准库组件:
▮▮▮▮⚝ 易用性:基础组件相对简单易用,但构建一个健壮且高效的线程池需要较高的开发成本和时间投入。使用 std::async
可以快速启动异步任务,但缺乏对线程池的细粒度控制。
▮▮▮▮⚝ 灵活性:提供了最大的灵活性,开发者可以完全自定义线程池的行为,例如线程创建策略、任务调度算法、资源管理方式等。然而,这种灵活性也意味着更高的复杂性和出错的可能性。
⚝ folly::CPUThreadPoolExecutor
:
▮▮▮▮⚝ 易用性:提供了高级抽象,简化了线程池的使用。通过简单的 API 即可创建、启动、停止线程池,并提交任务。对于常见的使用场景,配置选项已经足够丰富,无需从零开始构建线程池。
▮▮▮▮⚝ 灵活性:虽然不如完全自定义线程池灵活,但 CPUThreadPoolExecutor
仍然提供了相当程度的配置选项,例如可以自定义线程工厂、任务队列、拒绝策略等。可以满足大多数 CPU 密集型任务的需求。
③ 性能:
⚝ 标准库组件:
▮▮▮▮⚝ 性能:标准库组件本身性能良好,但手动构建的线程池性能取决于实现质量。如果实现不当,可能会引入额外的开销,例如频繁的线程创建和销毁、锁竞争等。
▮▮▮▮⚝ 优化:性能优化需要开发者深入理解并发原理和系统底层机制,进行精细的调优。
⚝ folly::CPUThreadPoolExecutor
:
▮▮▮▮⚝ 性能:经过 Facebook 内部大规模应用验证,CPUThreadPoolExecutor
具有出色的性能。它在设计上就考虑了高性能和低延迟,例如采用了高效的任务队列、优化的线程调度策略等。
▮▮▮▮⚝ 优化:folly
团队持续对其进行性能优化,开发者通常无需进行额外的性能调优即可获得良好的性能表现。
④ 功能特性对比:
特性 | 标准库组件 (手动构建) | folly::CPUThreadPoolExecutor |
---|---|---|
线程池管理 | 需要手动实现 | 内置,自动管理 |
任务队列 | 需要手动实现 | 内置,可配置 |
线程池大小调整 | 需要手动实现 | 支持动态调整 |
任务优先级 | 需要手动实现 | 支持任务优先级 |
异常处理 | 需要手动实现 | 提供异常处理机制 |
监控指标 | 需要手动实现 | 提供性能监控指标 |
配置选项 | 完全自定义 | 提供丰富的配置选项 |
易用性 | 较低 | 较高 |
性能 | 取决于实现质量 | 经过优化,性能优秀 |
依赖 | 无 | 依赖 folly 库 |
⑤ 总结:
⚝ 标准库组件 适合对并发编程有深入理解,需要高度定制化线程池,或者项目规模较小,手动实现线程池成本可控的场景。它提供了最大的灵活性,但需要开发者承担更多的实现和维护工作。
⚝ folly::CPUThreadPoolExecutor
适合需要高性能、稳定可靠的线程池,且项目已经或计划引入 folly
库的场景。它提供了丰富的功能和优异的性能,降低了开发和维护成本,但引入了对 folly
库的依赖。
在选择时,开发者需要权衡项目需求、开发资源、性能要求以及对第三方库的依赖等因素,选择最合适的方案。如果项目对性能有较高要求,且已经使用了 folly
库,那么 folly::CPUThreadPoolExecutor
通常是一个非常好的选择。如果项目较为简单,或者对线程池的定制化需求很高,那么使用标准库组件手动构建线程池也是一种可行的方案。
6.2 与其他开源线程池的对比(Comparison with Other Open Source Thread Pools)
除了标准库组件和 folly::CPUThreadPoolExecutor
,市面上还存在许多优秀的开源线程池实现。这些线程池各有特点,适用于不同的应用场景。本节将 folly::CPUThreadPoolExecutor
与一些常见的开源线程池进行对比,帮助读者更好地理解其定位和优势。
① Boost.Asio Thread Pool:
⚝ 概述:Boost.Asio 是一个强大的跨平台 C++ 库,用于异步输入/输出、定时器和并发操作。Asio 提供了 io_context
和 thread_pool
等组件,可以方便地构建基于线程池的异步程序。
⚝ 对比:
▮▮▮▮⚝ 定位:Boost.Asio 的线程池通常与其异步 I/O 模型结合使用,用于处理 I/O 密集型和 CPU 密集型任务。folly::CPUThreadPoolExecutor
更专注于 CPU 密集型任务的执行。
▮▮▮▮⚝ 集成:Boost.Asio 是一个综合性的库,提供了更广泛的异步编程工具,而 folly
库则更侧重于高性能的基础设施组件。如果项目已经使用了 Boost.Asio 进行异步 I/O 操作,那么使用 Boost.Asio 的线程池可能更自然。
▮▮▮▮⚝ 特性:Boost.Asio 的线程池通常与 io_context
关联,可以方便地处理异步事件和回调。folly::CPUThreadPoolExecutor
在任务优先级、监控指标等方面提供了更丰富的功能。
▮▮▮▮⚝ 性能:两者都经过了广泛的应用和优化,性能都比较优秀。具体的性能表现可能取决于具体的应用场景和配置。
▮▮▮▮⚝ 依赖:Boost.Asio 依赖 Boost 库,folly::CPUThreadPoolExecutor
依赖 folly
库。
② Poco C++ Libraries Thread Pool:
⚝ 概述:Poco C++ Libraries 是一套跨平台的 C++ 类库,提供了网络、数据库、XML、JSON 等多种功能。Poco 也提供了线程池实现,作为其并发编程组件的一部分。
⚝ 对比:
▮▮▮▮⚝ 定位:Poco 的线程池是 Poco 库整体解决方案的一部分,适用于构建基于 Poco 框架的应用。folly::CPUThreadPoolExecutor
是一个独立的、高性能的线程池组件,可以更灵活地集成到不同的项目中。
▮▮▮▮⚝ 集成:Poco 提供了更全面的应用开发框架,包括网络库、数据访问库等。如果项目使用了 Poco 框架,那么使用 Poco 的线程池可以更好地与整个框架集成。
▮▮▮▮⚝ 特性:Poco 的线程池提供了基本的线程池功能,例如任务提交、线程池管理等。folly::CPUThreadPoolExecutor
在高级特性方面,例如动态线程池大小调整、任务优先级、监控指标等方面,通常更为丰富。
▮▮▮▮⚝ 性能:两者都提供了可用的线程池实现,性能表现取决于具体实现和应用场景。folly::CPUThreadPoolExecutor
通常在性能方面有更极致的追求。
▮▮▮▮⚝ 依赖:Poco 依赖 Poco C++ Libraries,folly::CPUThreadPoolExecutor
依赖 folly
库。
③ Qt Thread Pool (QThreadPool
):
⚝ 概述:Qt 是一个流行的跨平台应用程序开发框架,广泛用于 GUI 应用程序开发。Qt 框架也提供了 QThreadPool
类,用于管理线程池。
⚝ 对比:
▮▮▮▮⚝ 定位:Qt 的线程池主要用于 Qt 应用程序的后台任务处理,例如非 GUI 线程执行耗时操作,避免阻塞主线程。folly::CPUThreadPoolExecutor
更通用,不局限于 GUI 应用,适用于各种 CPU 密集型任务。
▮▮▮▮⚝ 集成:Qt 的线程池与 Qt 的信号槽机制和事件循环集成良好,方便在 Qt 应用中进行线程间通信和任务调度。如果项目是 Qt GUI 应用,使用 QThreadPool
可以更好地融入 Qt 框架。
▮▮▮▮⚝ 特性:QThreadPool
提供了基本的线程池功能,例如任务提交、最大线程数设置等。folly::CPUThreadPoolExecutor
在功能丰富度和性能优化方面通常更胜一筹。
▮▮▮▮⚝ 性能:QThreadPool
适用于 GUI 应用的后台任务处理,性能满足一般需求。folly::CPUThreadPoolExecutor
在高性能服务器等场景下可能更具优势。
▮▮▮▮⚝ 依赖:Qt 依赖 Qt 框架,folly::CPUThreadPoolExecutor
依赖 folly
库。
④ 其他开源线程池:
⚝ 除了上述列举的线程池,还有许多其他的开源线程池实现,例如 C++ Taskflow, Intel TBB (Threading Building Blocks) 等。
⚝ C++ Taskflow:专注于任务并行和数据并行,提供了更高级的任务调度和依赖关系管理功能,适用于复杂的并行计算场景。
⚝ Intel TBB:Intel TBB 提供了丰富的并行算法和数据结构,线程池只是其并发编程工具箱的一部分。TBB 强调利用多核处理器的性能,提供了更高级的并行编程模型。
⑤ 功能特性对比 (部分开源线程池):
特性 | folly::CPUThreadPoolExecutor | Boost.Asio Thread Pool | Poco Thread Pool | Qt Thread Pool (QThreadPool ) | C++ Taskflow | Intel TBB Thread Pool |
---|---|---|---|---|---|---|
定位 | CPU 密集型任务 | 异步 I/O, 通用 | Poco 框架集成 | Qt GUI 应用后台任务 | 任务并行 | 通用并行计算 |
任务优先级 | 支持 | 较少支持 | 基本支持 | 无明显支持 | 支持 | 支持 |
动态线程池大小 | 支持 | 较少支持 | 基本支持 | 无明显支持 | 支持 | 支持 |
监控指标 | 丰富 | 较少 | 较少 | 较少 | 较少 | 较少 |
易用性 | 较高 | 中等 | 中等 | 较高 (Qt 应用) | 中等 | 中等 |
性能 | 优秀 | 优秀 | 良好 | 良好 | 优秀 | 优秀 |
依赖 | folly | Boost.Asio | Poco C++ Libraries | Qt Framework | 无 (header-only) | Intel TBB |
⑥ 总结:
⚝ 选择建议:选择线程池时,需要考虑项目需求、技术栈、性能要求以及对第三方库的依赖。
▮▮▮▮⚝ 如果项目已经使用了 folly
库,且需要高性能的 CPU 密集型任务处理,folly::CPUThreadPoolExecutor
是一个非常好的选择。
▮▮▮▮⚝ 如果项目使用了 Boost.Asio 进行异步 I/O,并且需要线程池与异步 I/O 协同工作,Boost.Asio Thread Pool 可能更合适。
▮▮▮▮⚝ 如果项目使用了 Poco 框架,Poco Thread Pool 可以更好地集成到整个框架中。
▮▮▮▮⚝ 如果项目是 Qt GUI 应用,QThreadPool
可以方便地进行后台任务处理。
▮▮▮▮⚝ 如果需要更高级的任务并行和数据并行功能,C++ Taskflow 或 Intel TBB 可能是更好的选择。
⚝ folly::CPUThreadPoolExecutor
的优势:folly::CPUThreadPoolExecutor
在 CPU 密集型任务处理、性能优化、功能丰富度(例如任务优先级、监控指标)方面具有优势。它是一个专注于高性能和稳定性的线程池实现,适用于对性能有较高要求的场景。
6.3 CPUThreadPoolExecutor
的优势与劣势(Advantages and Disadvantages of CPUThreadPoolExecutor
)
folly::CPUThreadPoolExecutor
作为一款高性能的线程池实现,在实际应用中展现出诸多优势,但也存在一些需要考虑的劣势。本节将从多个维度分析 CPUThreadPoolExecutor
的优缺点,帮助读者全面评估其适用性。
① 优势(Advantages):
⚝ 高性能(High Performance):
▮▮▮▮⚝ 专为 CPU 密集型任务优化:CPUThreadPoolExecutor
的设计目标就是高效地执行 CPU 密集型任务。它采用了多种优化策略,例如高效的任务队列、优化的线程调度算法、减少锁竞争等,以最大限度地提升性能。
▮▮▮▮⚝ 低延迟:在处理对延迟敏感的任务时,CPUThreadPoolExecutor
能够快速响应并执行任务,降低任务的执行延迟。
▮▮▮▮⚝ 经过大规模验证:在 Facebook 内部的大规模、高并发环境中经过长期验证,证明了其稳定性和高性能。
⚝ 丰富的功能特性(Rich Features):
▮▮▮▮⚝ 动态线程池大小调整:可以根据系统负载和任务需求动态调整线程池的大小,提高资源利用率和响应速度。
▮▮▮▮⚝ 任务优先级:支持任务优先级管理,可以优先执行重要的或紧急的任务,保证关键任务的及时完成。
▮▮▮▮⚝ 自定义任务队列:允许开发者自定义任务队列的实现,例如使用不同的队列类型或实现自定义的调度策略。
▮▮▮▮⚝ 异常处理机制:提供了完善的异常处理机制,可以捕获和处理任务执行过程中抛出的异常,保证程序的健壮性。
▮▮▮▮⚝ 监控与指标:提供丰富的性能监控指标,例如任务队列长度、活跃线程数、已完成任务数等,方便开发者监控线程池的运行状态和性能表现。
⚝ 与 folly
库的良好集成(Good Integration with folly
Library):
▮▮▮▮⚝ 无缝集成:作为 folly
库的一部分,CPUThreadPoolExecutor
与 folly
库的其他组件(例如 Future
, Promise
, EventBase
等)无缝集成,可以方便地构建基于 folly
的异步并发程序。
▮▮▮▮⚝ 共享基础设施:可以利用 folly
库提供的通用基础设施,例如内存管理、日志系统、配置管理等,提高开发效率和代码质量。
⚝ 易用性(Usability):
▮▮▮▮⚝ 简洁的 API:提供了简洁易用的 API,方便开发者创建、启动、停止线程池,并提交任务。
▮▮▮▮⚝ 合理的默认配置:提供了合理的默认配置,对于常见的使用场景,开发者无需进行复杂的配置即可快速上手。
⚝ 开源与社区支持(Open Source and Community Support):
▮▮▮▮⚝ 开源许可证:采用 Apache 2.0 开源许可证,允许自由使用、修改和分发。
▮▮▮▮⚝ 活跃的社区:folly
库拥有活跃的开发社区,可以获得及时的技术支持和问题解答。
② 劣势(Disadvantages):
⚝ 依赖 folly
库(Dependency on folly
Library):
▮▮▮▮⚝ 引入额外依赖:使用 CPUThreadPoolExecutor
需要引入 folly
库作为依赖。对于一些小型项目或对依赖有严格要求的项目,这可能是一个考虑因素。
▮▮▮▮⚝ 学习成本:folly
库本身是一个庞大而复杂的库,学习和理解 folly
库需要一定的成本。
⚝ 相对较高的复杂性(Relatively High Complexity):
▮▮▮▮⚝ 高级特性:为了实现高性能和丰富的功能,CPUThreadPoolExecutor
的内部实现相对复杂。对于初学者来说,理解其内部机制可能需要一定的学习曲线。
▮▮▮▮⚝ 配置选项:虽然提供了丰富的配置选项,但也可能增加配置的复杂性。开发者需要根据具体需求选择合适的配置参数。
⚝ 可能不是最轻量级的选择(Not the Most Lightweight Option):
▮▮▮▮⚝ 资源占用:相比于一些非常轻量级的线程池实现,CPUThreadPoolExecutor
在资源占用方面可能稍高一些。但这通常是为了换取更高的性能和更丰富的功能。
▮▮▮▮⚝ 编译时间:编译 folly
库本身可能需要较长的编译时间。
⚝ 文档相对较少(Relatively Less Documentation Compared to Standard Library):
▮▮▮▮⚝ 文档分散:folly
库的文档相对分散,可能不如标准库的文档那样集中和完善。开发者可能需要查阅源代码、示例代码和社区资源来获取更详细的信息。
③ 总结:
⚝ CPUThreadPoolExecutor
的优势远大于劣势,尤其是在需要高性能、稳定可靠的 CPU 密集型任务处理的场景下。其高性能、丰富的功能特性、与 folly
库的良好集成以及易用性使其成为一个非常有吸引力的选择。
⚝ 劣势主要集中在对 folly
库的依赖和相对较高的复杂性。开发者需要权衡这些因素,根据项目需求和技术栈做出合适的选择。
⚝ 适用场景:CPUThreadPoolExecutor
特别适合以下场景:
▮▮▮▮⚝ 高性能服务器:需要处理大量并发请求,对性能和延迟有较高要求的服务器应用。
▮▮▮▮⚝ CPU 密集型计算:需要执行复杂的计算任务,充分利用多核处理器性能的应用。
▮▮▮▮⚝ 已使用或计划使用 folly
库的项目:可以更好地利用 folly
库的整体优势。
6.4 适用场景分析与选择建议(Applicable Scenarios Analysis and Selection Recommendations)
选择合适的线程池是构建高效并发程序的关键步骤。folly::CPUThreadPoolExecutor
凭借其独特的优势,在某些场景下表现出色,但在其他场景下,可能存在更合适的选择。本节将深入分析 CPUThreadPoolExecutor
的适用场景,并提供选择建议,帮助读者根据实际需求做出明智的决策。
① 适用场景分析(Applicable Scenarios Analysis):
⚝ 高并发、CPU 密集型任务处理:
▮▮▮▮⚝ 场景描述:服务器后端、数据分析、图像处理、科学计算等需要处理大量并发请求或执行复杂计算任务的场景。这些任务通常是 CPU 密集型的,需要充分利用多核处理器的性能。
▮▮▮▮⚝ 适用性:CPUThreadPoolExecutor
专为 CPU 密集型任务设计,具有高性能和低延迟的特点,能够有效地处理高并发场景下的 CPU 密集型任务。其动态线程池大小调整和任务优先级功能可以进一步优化资源利用率和任务调度。
⚝ 对性能和延迟有较高要求的应用:
▮▮▮▮⚝ 场景描述:在线服务、实时系统、游戏服务器等对响应时间有严格要求的应用。这些应用需要快速响应用户请求,降低任务的执行延迟。
▮▮▮▮⚝ 适用性:CPUThreadPoolExecutor
在性能优化方面做了很多工作,能够提供较低的任务执行延迟。其高效的任务队列和优化的线程调度策略有助于快速处理任务,满足对延迟敏感的应用需求。
⚝ 已使用或计划使用 folly
库的项目:
▮▮▮▮⚝ 场景描述:项目已经使用了 folly
库的其他组件,或者计划引入 folly
库以利用其高性能基础设施。
▮▮▮▮⚝ 适用性:CPUThreadPoolExecutor
与 folly
库的其他组件无缝集成,可以方便地构建基于 folly
的异步并发程序。使用 CPUThreadPoolExecutor
可以更好地利用 folly
库的整体优势,降低集成成本和维护成本。
⚝ 需要精细化控制线程池行为的应用:
▮▮▮▮⚝ 场景描述:需要自定义任务队列、线程工厂、拒绝策略等线程池行为的应用。
▮▮▮▮⚝ 适用性:CPUThreadPoolExecutor
提供了丰富的配置选项,允许开发者自定义线程池的各个方面,例如自定义任务队列、线程工厂、拒绝策略等。这使得开发者可以根据具体需求精细化控制线程池的行为,满足更复杂的应用场景。
⚝ 需要监控线程池运行状态和性能指标的应用:
▮▮▮▮⚝ 场景描述:需要实时监控线程池的运行状态,了解其性能表现,以便进行性能调优和故障排查的应用。
▮▮▮▮⚝ 适用性:CPUThreadPoolExecutor
提供了丰富的性能监控指标,例如任务队列长度、活跃线程数、已完成任务数等。开发者可以通过这些指标监控线程池的运行状态,及时发现和解决性能问题。
② 不适用或不太适用的场景(Inapplicable or Less Applicable Scenarios):
⚝ I/O 密集型任务为主的应用:
▮▮▮▮⚝ 场景描述:网络编程、文件 I/O、数据库操作等 I/O 密集型任务为主的应用。这些任务的性能瓶颈通常在 I/O 操作上,而不是 CPU 计算。
▮▮▮▮⚝ 不适用性:CPUThreadPoolExecutor
专为 CPU 密集型任务优化,在 I/O 密集型任务场景下,其优势可能无法充分发挥。对于 I/O 密集型任务,使用异步 I/O 模型(例如 Boost.Asio, libevent
, libuv
)或专门为 I/O 密集型任务设计的线程池可能更合适。
⚝ 非常轻量级的应用或资源受限的环境:
▮▮▮▮⚝ 场景描述:嵌入式系统、移动设备等资源受限的环境,或者对程序体积和内存占用有严格要求的轻量级应用。
▮▮▮▮⚝ 不太适用性:CPUThreadPoolExecutor
依赖 folly
库,folly
库本身相对庞大,可能会增加程序体积和内存占用。在资源受限的环境下,可能需要考虑更轻量级的线程池实现,或者手动构建简单的线程池。
⚝ 对第三方库依赖有严格限制的项目:
▮▮▮▮⚝ 场景描述:项目有严格的第三方库依赖管理要求,或者需要尽量减少外部依赖。
▮▮▮▮⚝ 不太适用性:CPUThreadPoolExecutor
依赖 folly
库,引入 folly
库可能会增加项目的依赖复杂性。如果项目对第三方库依赖有严格限制,可能需要考虑使用标准库组件手动构建线程池,或者选择其他不依赖大型库的开源线程池。
⚝ 简单的并发任务处理,标准库组件已足够满足需求:
▮▮▮▮⚝ 场景描述:并发任务处理需求较为简单,使用标准库的 std::thread
, std::async
等组件已经能够满足需求,且手动构建线程池的成本可控。
▮▮▮▮⚝ 不太适用性:如果并发需求简单,标准库组件已经足够,并且手动构建线程池的成本不高,那么可能没有必要引入 folly::CPUThreadPoolExecutor
这样功能强大的线程池。
③ 选择建议(Selection Recommendations):
⚝ 优先选择 CPUThreadPoolExecutor
的场景:
▮▮▮▮⚝ 高并发、CPU 密集型任务:服务器后端、数据分析、科学计算等。
▮▮▮▮⚝ 对性能和延迟有较高要求:在线服务、实时系统等。
▮▮▮▮⚝ 已使用或计划使用 folly
库。
▮▮▮▮⚝ 需要精细化控制线程池行为。
▮▮▮▮⚝ 需要监控线程池运行状态和性能指标。
⚝ 考虑其他方案的场景:
▮▮▮▮⚝ I/O 密集型任务为主:考虑异步 I/O 模型或 I/O 密集型线程池。
▮▮▮▮⚝ 资源受限的环境或轻量级应用:考虑更轻量级的线程池实现或手动构建。
▮▮▮▮⚝ 对第三方库依赖有严格限制:考虑标准库组件或无依赖的线程池。
▮▮▮▮⚝ 简单的并发任务处理,标准库组件已足够:优先使用标准库组件。
⚝ 评估指标:在选择线程池时,可以从以下几个方面进行评估:
▮▮▮▮⚝ 性能:线程池的吞吐量、延迟、资源利用率等。
▮▮▮▮⚝ 功能:线程池提供的功能特性,例如任务优先级、动态线程池大小调整、监控指标等。
▮▮▮▮⚝ 易用性:API 的简洁性、文档的完善程度、学习曲线等。
▮▮▮▮⚝ 依赖:线程池的依赖库、依赖版本、依赖复杂性等。
▮▮▮▮⚝ 社区支持:社区的活跃程度、文档的更新频率、问题解答的及时性等。
▮▮▮▮⚝ 成熟度与稳定性:线程池的发布历史、应用规模、bug 修复速度等。
④ 总结:
⚝ 没有银弹:没有一个线程池能够完美适用于所有场景。选择合适的线程池需要根据具体的项目需求、技术栈、性能要求以及资源限制等因素进行综合考虑。
⚝ folly::CPUThreadPoolExecutor
是一个强大的工具:在合适的场景下,folly::CPUThreadPoolExecutor
能够发挥其高性能、功能丰富的优势,帮助开发者构建高效、稳定的并发程序。
⚝ 深入理解需求,谨慎选择:开发者需要深入理解项目的并发需求,仔细评估各种线程池的优缺点,谨慎选择最合适的方案。在某些情况下,甚至可能需要根据项目特点定制化线程池实现。
END_OF_CHAPTER
7. chapter 7: 总结与展望
7.1 CPUThreadPoolExecutor
的核心价值总结(Summary of Core Value of CPUThreadPoolExecutor
)
folly::CPUThreadPoolExecutor
作为 folly
库中重要的并发编程组件,为开发者提供了一个强大、高效且灵活的线程池解决方案。它不仅仅是一个简单的线程池实现,更是对现代并发编程理念的深刻体现和工程实践的结晶。本节将从多个维度总结 CPUThreadPoolExecutor
的核心价值,帮助读者更清晰地认识其在并发编程领域的重要性。
① 高效的任务处理能力:CPUThreadPoolExecutor
旨在充分利用多核 CPU 的计算能力,通过维护一个可复用的线程池来减少线程创建和销毁的开销,从而显著提高任务处理的效率和吞吐量。尤其在高并发、任务密集的场景下,其性能优势尤为突出。
② 灵活的配置与管理:CPUThreadPoolExecutor
提供了丰富的配置选项,允许开发者根据具体的应用场景和性能需求,灵活地调整线程池的大小、任务队列类型、线程工厂等参数。这种高度的灵活性使得 CPUThreadPoolExecutor
能够适应各种复杂的并发场景。
③ 强大的功能特性:除了基本的线程池功能外,CPUThreadPoolExecutor
还支持诸多高级特性,例如:
▮▮▮▮ⓑ 动态线程池大小调整:能够根据系统负载和任务队列的积压情况,动态地调整线程池的大小,以实现最佳的资源利用率和性能表现。
▮▮▮▮ⓒ 任务优先级管理:支持为提交的任务设置优先级,线程池可以根据优先级调度任务的执行顺序,确保重要任务得到优先处理。
▮▮▮▮ⓓ 自定义任务队列:允许开发者根据特定需求,自定义任务队列的实现,例如使用优先级队列、延迟队列等,以满足更复杂的任务调度策略。
▮▮▮▮ⓔ 完善的异常处理机制:提供了健壮的异常处理机制,能够捕获和处理任务执行过程中抛出的异常,保证程序的稳定性和可靠性。
▮▮▮▮ⓕ 全面的监控与指标:内置了丰富的性能指标和监控接口,方便开发者实时监控线程池的运行状态,进行性能分析和调优。
④ 与 folly
库的深度集成:CPUThreadPoolExecutor
与 folly
库的其他组件(如 Future
/Promise
、EventBase
等)无缝集成,可以方便地构建基于异步编程的高性能应用。这种集成性大大简化了并发编程的复杂性,提高了开发效率。
⑤ 广泛的应用场景:CPUThreadPoolExecutor
适用于各种需要并发处理任务的场景,例如:
▮▮▮▮ⓑ 网络服务器:处理客户端请求,提高服务器的并发处理能力。
▮▮▮▮ⓒ 数据处理与分析:并行处理大规模数据集,加速数据分析和计算过程。
▮▮▮▮ⓓ 异步任务调度:执行后台任务、定时任务等,提高系统的响应速度和资源利用率。
▮▮▮▮ⓔ 微服务架构:在微服务中作为任务调度的核心组件,提高服务的并发性和吞吐量。
总而言之,folly::CPUThreadPoolExecutor
不仅仅是一个工具,更是一种并发编程的最佳实践。它以其高效性、灵活性、强大的功能和与 folly
库的深度集成,成为了构建高性能、高并发应用的理想选择。掌握和应用 CPUThreadPoolExecutor
,对于提升软件系统的性能和可维护性具有重要的意义。
7.2 未来发展趋势与展望(Future Development Trends and Prospects)
随着计算机硬件技术的不断发展,特别是多核处理器和异构计算架构的普及,并发编程的重要性日益凸显。folly::CPUThreadPoolExecutor
作为一款优秀的线程池实现,其未来的发展趋势和前景广阔,值得我们深入探讨和展望。
① 更智能化的资源管理:未来的线程池将更加注重智能化的资源管理。例如,通过机器学习和自适应算法,线程池能够更精准地预测任务负载,动态调整线程池大小和资源分配,实现更高效的资源利用率和更低的延迟。这可能包括:
▮▮▮▮ⓑ 预测性伸缩(Predictive Scaling):基于历史负载数据和模式,预测未来的任务量,提前调整线程池大小,避免资源瓶颈。
▮▮▮▮ⓒ 智能任务调度(Intelligent Task Scheduling):根据任务的类型、优先级、资源需求等,智能地调度任务到最合适的线程执行,优化整体性能。
▮▮▮▮ⓓ 异构资源感知(Heterogeneous Resource Awareness):能够感知和利用不同类型的计算资源(如 CPU、GPU、FPGA 等),将任务分配到最适合的硬件上执行,充分发挥异构计算的优势。
② 更强大的监控与诊断能力:未来的线程池将提供更全面、更深入的监控和诊断能力,帮助开发者更好地理解线程池的运行状态,快速定位和解决性能问题。这可能包括:
▮▮▮▮ⓑ 实时性能分析(Real-time Performance Analysis):提供实时的性能指标监控和可视化界面,帮助开发者直观地了解线程池的性能瓶颈。
▮▮▮▮ⓒ 自动化诊断(Automated Diagnostics):能够自动检测线程池的异常状态和性能问题,并提供诊断报告和优化建议。
▮▮▮▮ⓓ Tracing 与 Profiling 集成:更深入地集成 tracing 和 profiling 工具,帮助开发者追踪任务的执行路径和性能瓶颈,进行精细化的性能调优。
③ 与新兴技术的融合:CPUThreadPoolExecutor
未来将更紧密地与新兴技术融合,例如:
▮▮▮▮ⓑ Serverless 计算:在 Serverless 环境下,线程池可以作为函数计算的底层执行引擎,提供高效的并发处理能力。
▮▮▮▮ⓒ AI 与机器学习:结合 AI 和机器学习技术,优化线程池的资源管理和任务调度策略,提高线程池的智能化水平。
▮▮▮▮ⓓ Rust 等新编程语言:folly
库本身也在不断发展,未来可能会有更多基于 Rust 等新编程语言的线程池实现,提供更高的性能和安全性。
④ 更易用、更标准化的 API:为了降低开发者的使用门槛,未来的线程池 API 将更加易用、更符合标准。例如,可能会出现更简洁的配置方式、更友好的错误提示、更完善的文档和示例代码。同时,随着并发编程模型的不断演进,线程池的 API 也可能朝着更标准化的方向发展,例如与 C++ 标准库的 Executor proposal 更好地对齐。
⑤ 更注重安全性和可靠性:在安全性方面,未来的线程池将更加注重防止拒绝服务攻击(DoS)和资源耗尽等安全风险。在可靠性方面,将提供更完善的错误处理和容错机制,确保线程池在各种异常情况下都能稳定运行。
总而言之,folly::CPUThreadPoolExecutor
的未来发展前景广阔。随着技术的进步和应用场景的不断扩展,线程池将在并发编程领域扮演越来越重要的角色。我们有理由相信,未来的 CPUThreadPoolExecutor
将会更加智能、高效、易用和可靠,为构建下一代高性能应用提供更强大的支持。
7.3 最佳实践与建议(Best Practices and Recommendations)
为了充分发挥 folly::CPUThreadPoolExecutor
的优势,并在实际应用中避免常见的陷阱,本节将总结一些最佳实践和建议,供读者参考。
① 合理配置线程池大小:线程池大小的配置是影响性能的关键因素。过小的线程池可能导致任务积压,无法充分利用 CPU 资源;过大的线程池则可能引起过多的上下文切换,反而降低性能。最佳的线程池大小需要根据具体的应用场景和硬件环境进行调优。
▮▮▮▮ⓑ CPU 密集型任务:对于 CPU 密集型任务,线程池大小通常设置为 CPU 核心数或略多于核心数,例如 number_of_cores + 1
。
▮▮▮▮ⓒ IO 密集型任务:对于 IO 密集型任务,线程池大小可以设置得更大一些,例如 2 * number_of_cores
甚至更高,因为线程在等待 IO 操作时可以释放 CPU 资源给其他线程。
▮▮▮▮ⓓ 动态调整:考虑使用 CPUThreadPoolExecutor
提供的动态调整线程池大小的功能,根据系统负载自动伸缩线程池。
② 选择合适的任务队列:任务队列的选择也会影响线程池的性能和行为。
▮▮▮▮ⓑ FIFOQueue
(先进先出队列):默认队列类型,适用于大多数场景,保证任务按照提交顺序执行。
▮▮▮▮ⓒ PriorityQueue
(优先级队列):适用于需要优先处理某些任务的场景,例如需要保证高优先级任务的低延迟。
▮▮▮▮ⓓ LifoSemMPMCQueue
(后进先出多生产者多消费者队列):在某些特定场景下,例如任务之间存在依赖关系,或者需要快速响应最新任务时,LIFO 队列可能更有效。
▮▮▮▮ⓔ 自定义队列:如果内置队列无法满足需求,可以考虑自定义任务队列,例如使用延迟队列实现定时任务调度。
③ 谨慎处理任务异常:任务执行过程中抛出的异常需要妥善处理,避免影响线程池的稳定性和程序的正确性。
▮▮▮▮ⓑ 捕获异常:在任务函数内部使用 try-catch
语句捕获可能抛出的异常。
▮▮▮▮ⓒ 异常处理策略:根据应用场景选择合适的异常处理策略,例如:
▮▮▮▮▮▮▮▮❹ 记录日志:将异常信息记录到日志中,方便后续分析和排查问题。
▮▮▮▮▮▮▮▮❺ 重试任务:对于可重试的异常,可以考虑重新提交任务执行。
▮▮▮▮▮▮▮▮❻ 终止任务:对于不可恢复的异常,可以终止任务执行,并进行相应的错误处理。
▮▮▮▮ⓖ 避免抛出未捕获异常:尽量避免任务函数抛出未捕获的异常,这可能导致程序崩溃或线程池状态异常。
④ 合理使用线程池的生命周期管理:正确管理线程池的生命周期,避免资源泄漏和程序错误。
▮▮▮▮ⓑ 启动线程池:在程序启动或需要使用线程池时,调用 start()
方法启动线程池。
▮▮▮▮ⓒ 停止线程池:在程序退出或不再需要线程池时,调用 stop()
方法停止线程池,并等待所有任务执行完成。
▮▮▮▮ⓓ 避免重复启动和停止:避免在程序运行过程中频繁地启动和停止线程池,这会增加额外的开销。
⑤ 监控线程池性能:定期监控线程池的性能指标,及时发现和解决性能问题。
▮▮▮▮ⓑ 关注关键指标:关注线程池的活跃线程数、任务队列长度、任务执行时间等关键指标。
▮▮▮▮ⓒ 使用监控工具:集成 folly
提供的监控工具或第三方监控系统,实时监控线程池的运行状态。
▮▮▮▮ⓓ 性能调优:根据监控数据进行性能分析和调优,例如调整线程池大小、优化任务调度策略等。
⑥ 结合 folly
库其他组件:充分利用 folly
库的其他组件,例如 Future
/Promise
、EventBase
等,构建更强大、更灵活的并发应用。
▮▮▮▮ⓑ 异步编程:使用 Future
/Promise
进行异步编程,提高程序的响应速度和并发性。
▮▮▮▮ⓒ 事件驱动:结合 EventBase
构建事件驱动的并发模型,适用于 IO 密集型应用。
⑦ 持续学习和实践:并发编程是一个复杂而深入的领域,需要不断学习和实践才能掌握其精髓。
▮▮▮▮ⓑ 深入理解并发原理:学习并发编程的基本概念、原理和模型,例如线程、进程、锁、同步、异步等。
▮▮▮▮ⓒ 阅读源码:阅读 folly::CPUThreadPoolExecutor
的源码,深入理解其实现细节和设计思想。
▮▮▮▮ⓓ 实践项目:通过实际项目应用 CPUThreadPoolExecutor
,积累经验,加深理解。
遵循以上最佳实践和建议,可以帮助开发者更好地使用 folly::CPUThreadPoolExecutor
,构建高效、稳定、可靠的并发应用。同时,也需要根据具体的应用场景和需求,灵活调整和优化,才能充分发挥 CPUThreadPoolExecutor
的潜力。
END_OF_CHAPTER