参考:C++高级编程. (美) Marc Gregoire著, 张永强译. 清华大学出版社. 2015.
文章目录
本文涉及的头文件:
- C++11标准线程库
<thread>
- 原子操作库
<atomic>
- 互斥体库
<mutex>
- 条件变量
<condition_variable>
:用于线程通信 - 时间工具库
<chrono>
CPU核心、进程与线程、并发与并行
查看CPU
图示计算机拥有1块CPU,包含4个独立的物理内核,8个逻辑处理器,即所谓的 单CPU 4核心 8线程。其中逻辑处理器的个数对应的就是线程数,是一个逻辑概念,表示一种能力。由于单个物理核同一时间点只能处理一个线程,所以通常线程数等于核心数。但是可以通过 “超线程技术” Hyper-Threading,使用每个CPU核心没有达到满负荷运载的剩余用量,用一个物理核模拟两个虚拟核,实现每个核处理两个线程。然而这两个虚拟核肯定是比不上真正的物理核的,这也是有些媒体声称 “六核六线程优于四核八线程” 的原因。
进程与线程
对比 | 进程 Process | 线程 Thread |
---|---|---|
定义 | 进程是程序运行的一个实体的运行过程,是系统进行资源分配和调配的一个独立单位 | 线程是进程运行和执行的最小调度单位 |
系统开销 | 创建撤销切换开销大,资源要重新分配和收回 | 仅保存少量寄存器的内容,开销小,在进程的地址空间执行代码 |
拥有资产 | 资源拥有的基本单位 | 基本上不占资源,仅有不可少的资源(程序计数器,一组寄存器和栈) |
调度 | 资源分配的基本单位 | 独立调度分配的单位 |
安全性 | 进程间相互独立,互不影响 | 线程共享一个进程下面的资源,可以互相通信和影响 |
地址空间 | 系统赋予的独立的内存地址空间 | 由相关堆栈寄存器和和线程控制表TCB组成,寄存器可被用来存储线程内的局部变量 |
简单来说,
进程 Process 是在系统中正在运行的应用程序;某软件一旦运行就是一个进程,如浏览器;进程是资源分配的最小单位,是线程的容器。一个进程可以包含多个线程,但肯定存在 主线程 MainThread
线程 Thread 是系统分配处理器时间资源(调度)的基本单元,包含在进程中,是进程中实际运作的最小单元。一个进程中的多个线程可以相互通信、内存共享,每条线程并行执行不同的任务。
并发与并行
并发 Concurrency:由于同一时刻只能有一条指令执行,在单CPU中多个进程指令被快速地轮换执行,使得在宏观上具有多个进程同时执行的效果,但在微观上并不是同时执行的,只是把时间分成若干段,使多个进程被快速地交替执行;或者在单核心中多个线程被快速地交替执行。
一条指令和另一条指令交错执行,操作系统实现这种交错执行的机制称为:上下文切换 Context Switch。上下文是指操作系统保持跟踪进程或线程运行所需的所有状态信息,如寄存器文件的当前值、主存内容等
并行 Parallel:指在同一时刻有多条指令被同时执行,可以是多进程分配到多CPU上同时执行,也可以是多线程分配到多核心上。所以无论从微观还是从宏观来看,二者都是一起执行的。
总结来说,
无论是并发还是并行,都只是一种执行顺序的描述,使用者看到的则是多进程、多线程。进程 处理的任务多,每个进程都有独立的内存单元,占用CPU资源相对较少,但进程间切换开销大、不能通信;线程 处理任务相对较少,多个线程共享内存单元,占用资源少,可以相互通信(同步机制),就像同一程序(进程)的两个函数都能访问全局变量。
C++11标准线程库 <thread>
C++11实现了标准的线程库,支持跨平台编程,定义在 <thread> 头文件中。下面介绍线程的创建、取消和异常处理。
namespace std
{
class thread
{
public:
// 构造,可接受任意个数的参数
template< class Function, class... Args >
explicit thread( Function&& f, Args&&... args );
// 复制构造
thread(const thread&) = delete;
thread(thread&&) noexcept;
thread& operator=(const thread&) = delete;
thread& operator=(thread&&) noexcept;
class id;
id get_id(); // 返回线程id
void swap(thread&) noexcept;
bool joinable() const noexcept; // 检查线程是否可合并
void join(); // 阻塞父线程,等待其执行完成
void detach(); // 子线程独立执行
};
namespace this_thread
{
std::thread::id get_id() noexcept;
void yield() noexcept;
template <class Clock, class Duration>
void sleep_until(const chrono::time_point<Clock, Duration>& abs_time);
template <class Rep, class Period>
void sleep_for(const chrono::duration<Rep, Period>& rel_time);
}
}
线程创建
函数指针
函数名即为函数指针,也是函数地址。
void task(¶m1, ¶m2, ...);
std::thread t{ task, param1, std::ref(param2), ... };
t.join(); // 实际应用中需避免使用,会阻塞父线程
需要说明的是:
- 不同线程中访问
std::cout
是 线程安全 的,没有任何竞争风险 - 参数需要引用传递时,必须使用
std::ref
或std::cref
,这是因为C++11的设计者认为函数模板 bind 和 thread 默认应该对参数进行按值传递,禁止了一般的&引用传递。理由是函数模板在创建函数时已经把参数传入,而该函数不确定什么时候执行,如果使用一般引用的话,函数执行前引用参数发生改变,则最终无法得到创建函数时的预想结果。因此需要区别设计,以示提醒。
函数对象
只要实现了operator()的类或者结构体,都可以称为函数对象。相比函数指针,在面向对象编程中,这种实现更有优势。
class Counter
{
public:
Counter(int _id, int _iterations) : id(_id), iterations(_iterations) {};
void operator()() const
{
for (int i=0; i<=iterations; ++i)
{
std::cout << "Counter " << id << " has value:";
std::cout << i << std::endl;
}
}
private:
int id;
int iterations;
};
std::thread t1{ Counter{0, 5} }; // 统一初始化语法
std::thread t2{ Counter{1, 5} };
std::thread t3{ Counter{2, 5} };
t1.join(); // 实际应用中需避免使用,会阻塞父线程
t2.join();
t3.join();
需要说明的是:
- 为避免编译错误,应使用统一初始化语法
- 函数对象总是复制到线程的某个内部存储中
- 函数对象的特定实例如需引用传递,也是必须使用
std::ref
或std::cref
- 所示代码的cout输出存在 “交错” 现象,需要互斥以实现每次只有一个线程读写流对象(后面介绍)
lambda
lambda 表达式用来生成简单的匿名函数。
[函数对象参数] (操作符重载函数参数) mutable 或 exception 声明 -> 返回值类型 {函数体}
- 函数对象参数不能省略
- 操作符重载参数可省
- 返回值类型可省,编译器根据 return 语句自动推断
- 函数体不能省
std::thread t( [param1, param2] {
函数体
} );
t.join(); // 实际应用中需避免使用,会阻塞父线程
类的成员函数
通过这种方式创建线程,可在不同线程中执行某对象的成员函数。但是如果和其他线程访问同一个对象,必须保证访问是线程安全的,避免竞争。
class MyClass
{
public:
MyClass();
task(_param1, _param2);
private:
param1;
param2;
};
MyClass object;
std::thread t{ &MyClass::task, &object }; // &类成员函数 + &实例
t.join(); // 实际应用中需避免使用,会阻塞父线程
读取线程的处理结果
- 传入结果变量的引用
std::ref()
- 将结果存储在类的实例化对象的数据成员中
- 使用下文介绍的
std::future::get()
线程取消
不存在线程取消机制,即无法在一个线程中取消另一个线程。但可以通过线程通信,最简单就是设置共享变量flag,线程自身通过查询flag来决定是否应该终止(线程函数return
即可),注意竞争。
参考:如何终止线程
线程本地存储
通过关键字 thread_local
可 将变量定义为线程本地数据 ,即每个线程都有这个变量的独立副本,不会出现竞争。
thread_local int var1; // 每个线程都有独立的 n 副本
int var2; // 每个线程共享 a 数据,应避免竞争
void task(¶m1, ¶m2, ...);
int main()
{
std::thread t{ task, param1, param2 };
t.join(); // 实际应用中需避免使用,会阻塞父线程
}
如果 thread_local
声明位于函数作用域内,则其定义的变量相当于 static
,同时在每个线程中都有自己的独立副本,且无论函数调用多少次,该变量在每个线程内只初始化一次。
异常处理
异常的捕获和处理可以使用下文介绍的future
,也可以按以下方法手动捕获处理。C++异常处理教程
一个线程的异常不能在另一线程中捕获。可以通过参数引用传递的方式,通过std::exception_ptr
类型的异常变量判断异常std::current_exception()
是否发生,若是则重新抛出异常 std::rethrow_exception
,再捕获处理 try...catch
。这样,异常就从子线程转移到了父线程中。
void task(std::exception_ptr& error)
{
try
{
// do something
throw std::runtime_error("人为异常"); // 主动抛出异常
}
catch(...) // 捕获任何异常
{
std::cout << "线程异常..." << std::endl;
error = std::current_exception();
}
}
int main()
{
try
{
std::exception_ptr error;
std::thread t{ task, std::ref(error) };
t.join();
if(error)
{
std::cout << "收到子线程异常,重新抛出..." << std::endl;
std::rethrow_exception(error);
}
}
catch(const std::exception& e)
{
std::cout << "捕获子线程异常:'" << e.what() << "'" << std::endl;
}
}
其中 std::exception_ptr
的类型是NullablePointer,意味着如果异常不发生则该实例返回的是空指针,所以可用 if语句 来检查。
Race Conditions
A race condition or race hazard is the condition of an electronics, software, or other system where the system’s substantive behavior is dependent on the sequence or timing of other uncontrollable events. It becomes a bug when one or more of the possible behaviors is undesirable. —— 维基百科
竞争冒险(race hazard)又名竞态条件、竞争条件(race condition),它旨在描述一个系统或者进程的输出依赖于不受控制事件的出现顺序或者出现时机。
举例来说,如果计算机中的两个进程 or 线程同时试图修改一个 共享内存 的内容,在没有并发控制的情况下,最后的结果依赖于它们的执行顺序与时机,这会导致Bug出现。
错误代码示例:
用10个线程分别给计数器赋值10次,每次赋值耗时0.1秒。预期输出100,但实际输出87,耗时1.01秒。节省了时间,但发生了计数错误。
#include <iostream>
#include <vector>
#include <thread>
#include <chrono>
using namespace std::chrono_literals; // C++14支持用户定义的字面量,如 100ms,定义于内联命名空间 std::literals::chrono_literals
void task_sleep(int& counter)
{
for (int i = 0; i < 10; ++i)
{
++counter;
std::this_thread::sleep_for(100ms);
}
}
int main(int argc, char* argv[])
{
int counter{ 0 };
std::vector<std::thread> threads(10);
for (int i = 0; i < threads.size(); ++i)
{
threads[i] = std::thread{ task_sleep, std::ref(counter) };
}
for (auto& t : threads) t.join();
std::cout << "result: " << counter << std::endl; // 输出 82
return 0;
}
这种竞争冒险常见于不良设计的电子系统,在软件中也比较常见,尤其是有采用 多线程技术 的软件,那么必须分外注意执行顺序。为了避免这个问题,可以禁止线程间共享内存或者提供 同步机制。
同步机制用于保证一次只有一个线程在更改共享内存。在C++11版本中,提供了两种手段:原子操作和显式同步。原子操作 常用来同步简单的标量数据类型和其指针类型,如bool, int, char*;而由 互斥体和锁 实现的显式同步则用于复杂数据。
原子操作 <atomic>
原子操作 Atomic Operation,指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何上下文切换。原子操作的主要特性是 不可分割,可以是一个步骤,也可以是多个步骤的整体,都是一个 最小 操作单元,这正是原子性(atomic)的体现。因此,原子操作具体的内部实现与结构不可被上层操作发现、修改和分割。原子性必须需要硬件的支持,是和CPU架构相关的。
原子操作常用来同步标量数据类型和其指针类型,如bool, int, char*等简单数据。如下代码所示,修补上一节的Bug,使用原子操作进行计数,实际输出100,耗时1.01秒。
#include <iostream>
#include <vector>
#include <thread>
#include <chrono>
#include <atomic> // 原子操作
#include <ctime>
using namespace std::chrono_literals; // C++14支持用户定义的字面量,如 10ms
void task_sleep(std::atomic<int>& counter)
{
int temp = 0;
for (int i = 0; i < 10; ++i)
{
//++counter; 应避免频繁的原子操作,使用局部变量替代
++temp;
std::this_thread::sleep_for(100ms);
}
counter += temp;
}
int main(int argc, char* argv[])
{
auto startTime = clock();
std::atomic<int> counter{ 0 }; // 原子操作,或直接定义类型 std::atomic_int16_t
std::vector<std::thread> threads(10);
for (int i = 0; i < threads.size(); ++i)
{
threads[i] = std::thread{ task_sleep, std::ref(counter) };
}
for (auto& t : threads) t.join();
auto endTime = clock();
int result = counter.load(); // 可以取出原子数据,也可以直接当作源类型参与运算
printf("输出%d,耗时%.2fs\n", result, double(endTime - startTime) / 1000.);
return 0;
}
互斥机制 <mutex>
互斥机制用来保证在任一时刻,只能有一个线程读取数据,线程与线程是互斥的。C++中互斥机制定义在 <mutex> 头文件中,包括互斥体类、锁类和call_once函数。关于互斥体类和锁类,我的理解是:互斥体 是共享内存的大门,线程访问数据想要实现互斥就要锁上大门,锁不上就是该数据正在被其他线程读写,因此互斥体需要能够锁定lock和解除unlock锁定。而 锁 类则是管理大门的方式,接管互斥体,用来自动lock和unlock,就不需要互斥体自己动作了。
- 锁住的东西越少,执行效率越高
- 只读的数据不需要互斥;又读又写的数据块才需要
std::call_once()
是避免竞争的手段,是最简单的互斥机制,用来确保某个函数或方法只调用一次,不论多少线程尝试调用,需要配合std::once_flag
使用。
#include <iostream>
#include <vector>
#include <thread>
#include <mutex> // call_once
std::once_flag init_flag;
void init()
{
std::cout << "执行初始化..." << std::endl;
}
void task_sleep()
{
using namespace std::chrono_literals;
std::this_thread::sleep_for(1s);
std::cout << "线程执行" << std::endl;
//init();
std::call_once(init_flag, init);
}
int main(int argc, char* argv[])
{
std::vector<std::thread> threads(10);
for (int i = 0; i < threads.size(); ++i)
{
threads[i] = std::thread{ task_sleep };
}
for (auto& t : threads) t.join();
return 0;
}
互斥体类
互斥体Mutual Exclusion,或称“互斥量”,相当于共享数据的大门,能够锁定和解除锁定。类型有
std::mutex
与std::recursive_mutex
std::timed_mutex
与std::recursive_timed_mutex
std::shared_timed_mutex
共享拥有是只能被一个线程锁定去写入,可被多个线程锁定去读取
共有的成员函数有:
lock()
: 尝试锁定,会阻塞当前线程,直到获得锁定成功。try_lock()
: 尝试锁定一次,返回结果true或falseunlock()
: 解除锁定
定时类特有:
try_lock_for()
:在某段时间内尝试锁定,超时则返回falsetry_lock_until()
:在某个时间点前尝试锁定,超时则返回false
锁类
是管理互斥体的方式,能更方便安全自动地对互斥体进行锁定lock和解除锁定unlock,锁类在作用域结束后析构,同时解锁关联的互斥体。
- lock_guard
guard n. 门卫;哨兵
lock_guard:最简单安全的上锁和解锁方式,在 lock_guard 对象构造时自动对互斥量上锁,析构时自动解锁,即使程序抛出异常后也能解锁已被上锁的 互斥量。来源
- unique_lock
以 独占所有权 的方式(unique owership)自动管理 互斥体mutex 的上锁和解锁,也是安全的,允许定时操作,比lock_guard更灵活。
-
shared_lock
-
std::lock() 和 std::try_lock()
提供了两个可变参数的模板函数,用来同时锁定多个锁,而不会出现死锁。如 defer_lock_t 方式的unique_lock或shared_lock都需要手动锁定。std::lock()
:如果其中某个互斥体+锁出现异常,则解除所有锁定。std::try_lock()
:尝试锁定一次所有的互斥体+锁,全部成功则返回-1;失败则解除所有锁定,返回失败点的位置索引。
代码示例:
互斥锁示例
代码1:使用 lock_guard
同步函数对象的所有实例对cout流输出的访问,消除交错现象。
- 将互斥体定义为静态数据成员
- 静态数据成员属于类而不是某个对象,对所有对象共享
- 静态成员变量需要在类内声明static,类外定义并初始化
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
class Counter
{
public:
Counter(int _id, int _iterations) : id(_id), iterations(_iterations) {};
void operator()() const
{
for (int i=0; i<=iterations; ++i)
{
std::lock_guard<std::mutex> lock(mMutex); // 每次循环结束则自动解除锁定
std::cout << "Counter " << id << " has value:";
std::cout << i << std::endl;
}
}
private:
int id;
int iterations;
static std::mutex mMutex; // 声明静态数据成员
};
std::mutex Counter::mMutex; // 静态数据成员的定义和初始化
int main(int argc, char* argv[])
{
std::vector<std::thread> threads(3);
for (int i = 0; i < threads.size(); ++i)
{
threads[i] = std::thread{ Counter{i, 5} };
}
for (auto& t : threads) t.join();
return 0;
}
代码2:使用 unique_lock
尝试锁定cout输出流,超时则放弃。
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
class Counter
{
public:
Counter(int _id, int _iterations) : id(_id), iterations(_iterations) {};
void operator()() const
{
for (int i=0; i<=iterations; ++i)
{
using namespace std::chrono_literals;
std::unique_lock<std::timed_mutex> lock(mMutex, 50ms); // 尝试在50ms内锁定
if (lock)
{
std::this_thread::sleep_for(10ms);
std::cout << "Counter " << id << " has value:";
std::cout << i << std::endl;
}
else
{
std::cout << "Counter " << id << " lock failed." << std::endl;
}
}
}
private:
int id;
int iterations;
static std::timed_mutex mMutex;
};
std::timed_mutex Counter::mMutex; // 静态数据成员的定义和初始化
int main(int argc, char* argv[])
{
std::vector<std::thread> threads(3);
for (int i = 0; i < threads.size(); ++i)
{
threads[i] = std::thread{ Counter{i, 5} };
}
for (auto& t : threads) t.join();
return 0;
}
结果看出,锁定时间不宜过长,应立即释放。
线程通信:条件变量 <condition_variable>
条件变量(condition variable)是显式的线程间通信的一种机制,主要包括两个动作:一个或多个线程等待某个条件为真,而将自己挂起(阻塞);另一个线程使通知条件(谓词参数)成立,再通知等待线程继续。为了防止竞争,条件变量的使用总是和一个互斥锁结合在一起。
有意通过共享变量通知后台线程时必须:
- 获得互斥锁:
std::mutex
+ lock类 - 在保有锁时进行修改
- 在
std::condition_variable
上执行notify_one
或notify_all
(不需要为了通知而保有锁,建议通知前手动解锁,以避免等待线程刚被唤醒就阻塞)
即使共享变量是原子的,也必须在互斥下修改它,以正确地发布修改到等待的线程。
任何有意要等待共享变量的后台线程必须:
- 获得互斥锁
std::unique_lock<std::mutex>
,和锁定共享变量者的互斥相同 - 执行condition_variable ::
wait
、wait_for
或wait_until
及谓词参数,进行等待并挂起(阻塞)该线程 - 收到 condition_variable 的唤醒通知后,继续任务
后台线程会被唤醒的情形包括:① 通知线程调用
共享变量的 notify_one() 或 notify_all()
,② 等待超时,③ 虚假唤醒 发生。因此,为了正确起见,有必要在线程完成等待之后验证 谓词参数 的条件确实为真;若唤醒是虚假的,则继续等待。
其他:
- std::condition_variable 只可与
std::unique_lock
一同使用,这限制在一些平台上的效率。 - std::condition_variable_any 可与任何类型的 Lock对象 一同使用,例如 std::shared_lock
condition_variable类
禁止复制构造、可移动构造,支持以下方法:- notify_one,notify_all
- wait,wait_for,wait_until
代码示例: 后台线程将传入的字符串转为小写字母
- 后台线程等待父线程传入数据;
- 父线程传入数据,唤醒后台线程,随即等待后台线程的处理结果;
- 后台线程进行大小写转换,随后唤醒父线程
- 父线程拿到处理结果
#include "Worker.h"
int main(int argc, char* argv[])
{
// 创建后台线程
Worker* worker = new Worker;
std::thread th{ &Worker::task, worker };
// 发送数据到 worker 线程
{
std::lock_guard<std::mutex> lock(worker->mMutex); // 作用域结束自动解锁
worker->mQueue.push("THIS IS AN EXAMPLE");
worker->ready = true;
std::cout << "Data for worker sent successfully\n";
std::cout << worker->mQueue.back() << std::endl;
}
worker->mCondVar.notify_one();
// 等待处理结果
{
std::unique_lock<std::mutex> lock(worker->mMutex);
worker->mCondVar.wait(lock, [worker] {return worker->processed; });
}
std::cout << worker->mQueue.back() << std::endl;
th.join();
return 0;
}
=====================================
// Worker.hpp
#pragma once
#include <iostream>
#include <queue>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
class Worker
{
public:
Worker() {}
void task()
{
// 等待父线程传入数据
std::unique_lock<std::mutex> lock(mMutex); // 只能使用unique_lock
mCondVar.wait(lock, [this] {return this->ready; }); // 等待完成即锁定
// 处理数据
std::string& str = mQueue.back();
std::transform(str.begin(), str.end(), str.begin(), tolower);
// 完成后通知父线程
processed = true;
std::cout << "Worker thread signals data processing completed\n";
lock.unlock(); // 通知前完成手动解锁,以避免等待线程刚被唤醒就阻塞
mCondVar.notify_one();
}
public:
bool ready = false;
bool processed = false;
std::queue<std::string> mQueue;
std::mutex mMutex;
std::condition_variable mCondVar;
};
future机制 <future>
future机制用来方便取出后台线程的处理结果。使用方式包括:
- future/promise
- packaged_task:打包一个函数,存储其返回值以进行异步获取
- async: 异步运行一个函数(有可能在新线程中执行),返回结果存储在 std::future中
代码示例: 异步执行并取出结果
- 参数
std::launch::async
强制开启一个线程,若无法开启则报错 - 参数
std::launch::deferred
当后期执行get()调用时才会“延期”执行后台线程
#include <future>
template<class T>
int twoSum(T a, T b) // 不能使用引用传值
{
return a + b;
}
int main(int argc, char* argv[])
{
auto ret = std::async(std::launch::async, twoSum<int>, 2, 5);
auto value = ret.get();
std::cout << value << std::endl; // 7
return 0;
}
线程池
专业的线程池能够更加强大地管理多线程,而不是动态地创建和销毁大量线程,如 TBB(Intel® Threading Building Block), PPL(Parallel Patterns Library) 等。
编程错误及处理
thread(43,14): error C2672: “std::invoke”: 未找到匹配的重载函数
----- 创建线程的参数类型不匹配