Pthreads是unix操作系统里面的一个标准库(因此不得不打开跑在机械上的慢死的ubuntu),与之前学习的MPI不同,Pthreads用于共享内存系统的编程,也就是不同的核共享一块内存。相比分布式内存系统,不必担心不同核内存下数据的不一致问题,但是存在了“答案莫名奇妙错误”的情况。下面看一个简单的例子:
我的代码是跑在ubuntu的clion上的,需要在CMake中加一点配置
find_package(Threads REQUIRED)
target_link_libraries(main Threads::Threads)
其实默认启动Pthreads程序都要特殊的命令的,比如运行 ./main <启动的线程数>,但是对于我这种懒癌晚期的人,是不可能用命令行的,所以直接在程序输入启动的线程数目。
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
int thread_count;
void * Hello(void * rank);
int main(int argc, char * argv[])
{
long thread;
pthread_t * thread_handles;
scanf("%d",&thread_count);
// thread_count = strtol(argv[1], NULL, 10);
thread_handles = malloc(thread_count * sizeof(pthread_t));
for(thread=0 ; thread<thread_count; thread++)
pthread_create(&thread_handles[thread], NULL, Hello, (void *)thread);
printf("Hello from the main thread\n");
for(thread=0; thread<thread_count; thread++)
pthread_join(thread_handles[thread], NULL);
free(thread_handles);
return 0;
}
void * Hello(void * rank)
{
long my_rank = (long)rank;
printf("Hello from thread %ld of %d\n", my_rank, thread_count);
return NULL;
}
具体的Pthreads的函数就不一一说明了。要注意的是,不同于MPI,Pthreads不是由脚本来启动的,而是直接由可执行程序启动,也就是说我们必须显示的启动线程。
下面是简单的多线程算矩阵*向量的代码。因为用math库编译gcc要加-lm嫌麻烦就写成了c++
#include <bits/stdc++.h>
#include <pthread.h>
#define maxn 110
using namespace std;
int thread_count;
int A[maxn][maxn], x[maxn], y[maxn];
int n, size;
void *Hello(void *rank);
void *Mul(void *rand);
int main() {
long thread;
pthread_t *thread_handles;
scanf("%d", &thread_count);
// thread_count = strtol(argv[1], NULL, 10);
scanf("%d", &n);
for (int i = 0; i < n; i++)
for (int j = 0; j < n; j++)
A[i][j] = rand() % 100;
for (int i = 0; i < n; i++)x[i] = rand() % 100;
size = ceil(1.0 * n / thread_count);
thread_handles = static_cast<pthread_t *>(malloc(thread_count * sizeof(pthread_t)));
for (thread = 0; thread < thread_count; thread++)
pthread_create(&thread_handles[thread], NULL, Mul, (void *) thread);
for (thread = 0; thread < thread_count; thread++)
pthread_join(thread_handles[thread], NULL);
for (int i = 0; i < n; i++)
printf("%d ", y[i]);
free(thread_handles);
return 0;
}
void *Hello(void *rank) {
long my_rank = (long) rank;
printf("Hello from thread %ld of %d\n", my_rank, thread_count);
return NULL;
}
void *Mul(void *rank) {
long my_rank = (long) rank;
int L = size * my_rank;
int R = size * (my_rank + 1) - 1;
for (int i = L; i <= R; i++)
for (int j = 0; j < n; j++)
y[i] += A[i][j] * x[j];
return NULL;
}
下面是“答案莫名奇妙错误”的一个例子:
#include <bits/stdc++.h>
#include <pthread.h>
#define db double
#define maxn 110
using namespace std;
int thread_count, n, size;
db ans;
void *Cal(void *rand);
int main() {
cout << fixed << setprecision(10);
pthread_t *thread_handles;
cin >> thread_count >> n;
size = ceil(1.0 * n / thread_count);
cout << size << endl;
thread_handles = static_cast<pthread_t *>(malloc(thread_count * sizeof(pthread_t)));
for (long thread = 0; thread < thread_count; thread++)
pthread_create(&thread_handles[thread], NULL, Cal, (void *) thread);
for (long thread = 0; thread < thread_count; thread++)
pthread_join(thread_handles[thread], NULL);
cout << ans * 4 << endl;
free(thread_handles);
return 0;
}
void *Cal(void *rank) {
long my_rank = (long) rank;
int L = size * my_rank;
int R = size * (my_rank + 1) - 1;
cout << L << " " << R << endl;
db now;
if (my_rank & 1)now = -1;
else now = 1;
for (int i = L; i <= R; i++, now = -now)
ans += now / (2 * i + 1);
return NULL;
}
这个就是书上的代码,自己实现了一下也能发现很明显的问题,开一个线程的时候问题不大,但是开多个的时候答案误差就很大。这是因为简单的ans+=now/(2*i+1)这句话在执行时分为好几步,多个线程同时操作就会出问题,导致答案错误,这是我们不能接受的。
解决方法有很多:
1:忙等待
#include <bits/stdc++.h>
#include <pthread.h>
#define ll long long
#define db double
#define maxn 110
using namespace std;
ll thread_count, n, size;
db ans;
int falg;
void *Cal(void *rand);
int main() {
cout << fixed << setprecision(10);
pthread_t *thread_handles;
cin >> thread_count >> n;
clock_t str, end;
str = clock();
size = ceil(1.0 * n / thread_count);
thread_handles = static_cast<pthread_t *>(malloc(thread_count * sizeof(pthread_t)));
for (long thread = 0; thread < thread_count; thread++)
pthread_create(&thread_handles[thread], NULL, Cal, (void *) thread);
for (long thread = 0; thread < thread_count; thread++) {
pthread_join(thread_handles[thread], NULL);
}
cout << ans * 4 << endl;
free(thread_handles);
end = clock();
cout << 1.0 * (end - str) / CLOCKS_PER_SEC << endl;
return 0;
}
void *Cal(void *rank) {
long my_rank = (long) rank;
ll L = size * my_rank;
ll R = size * (my_rank + 1) - 1;
db now, sum = 0;
if (my_rank & 1)now = -1;
else now = 1;
for (ll i = L; i <= R; i++, now = -now) {
sum += now / (2 * i + 1);
}
while (falg != my_rank);
ans += sum;
falg = (falg + 1) % thread_count;
return NULL;
}
2:互斥量
#include <stdlib.h>
#include <stdio.h>
#include <math.h>
#include <pthread.h>
#define ll long long
#define db double
#define maxn 110
ll thread_count, n, size;
db ans;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void *Cal(void *rand);
int main() {
pthread_t *thread_handles;
scanf("%lld%lld", &thread_count, &n);
clock_t str, end;
str = clock();
size = (int) (1.0 * n / thread_count + 0.99);
thread_handles = (malloc(thread_count * sizeof(pthread_t)));
for (long thread = 0; thread < thread_count; thread++)
pthread_create(&thread_handles[thread], NULL, Cal, (void *) thread);
for (long thread = 0; thread < thread_count; thread++) {
pthread_join(thread_handles[thread], NULL);
}
printf("%.10f\n", ans * 4);
free(thread_handles);
pthread_mutex_destroy(&mutex);
end = clock();
printf("time:%.5f\n", 1.0 * (end - str) / CLOCKS_PER_SEC);
return 0;
}
void *Cal(void *rank) {
long my_rank = (long) rank;
ll L = size * my_rank;
ll R = size * (my_rank + 1) - 1;
db now, sum = 0;
if (my_rank & 1)now = -1;
else now = 1;
for (ll i = L; i <= R; i++, now = -now) {
sum += now / (2 * i + 1);
}
pthread_mutex_lock(&mutex);
ans += sum;
pthread_mutex_unlock(&mutex);
return NULL;
}
通过这两种的比较,可以发现,如果开的线程很多,使自己机器实际拥有线程的好几倍了,忙等待就会变得很慢,而互斥量则不会。原因是,如果我们开了好多线程,因为本身的核数有限,那肯定有许多线程是等待的,并且,当某一个线程完事以后os选择新的线程的时候是随机的,而我们的falg变量是递增的,也就是说,os可能选择了10号线程,但是falg=5,所以10号一直等。就是变慢很多。