Pthreads是unix操作系统里面的一个标准库(因此不得不打开跑在机械上的慢死的ubuntu),与之前学习的MPI不同,Pthreads用于共享内存系统的编程,也就是不同的核共享一块内存。相比分布式内存系统,不必担心不同核内存下数据的不一致问题,但是存在了“答案莫名奇妙错误”的情况。下面看一个简单的例子:

我的代码是跑在ubuntu的clion上的,需要在CMake中加一点配置

find_package(Threads REQUIRED)
target_link_libraries(main Threads::Threads)

其实默认启动Pthreads程序都要特殊的命令的,比如运行 ./main <启动的线程数>,但是对于我这种懒癌晚期的人,是不可能用命令行的,所以直接在程序输入启动的线程数目。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

int thread_count;

void * Hello(void * rank);

int main(int argc, char * argv[])
{
    long thread;
    pthread_t * thread_handles;
    scanf("%d",&thread_count);
//    thread_count = strtol(argv[1], NULL, 10);

    thread_handles = malloc(thread_count * sizeof(pthread_t));

    for(thread=0 ; thread<thread_count; thread++)
        pthread_create(&thread_handles[thread], NULL, Hello, (void *)thread);

    printf("Hello from the main thread\n");

    for(thread=0; thread<thread_count; thread++)
        pthread_join(thread_handles[thread], NULL);

    free(thread_handles);
    return 0;
}

void * Hello(void * rank)
{
    long my_rank = (long)rank;
    printf("Hello from thread %ld of %d\n", my_rank, thread_count);
    return NULL;
}

具体的Pthreads的函数就不一一说明了。要注意的是,不同于MPI,Pthreads不是由脚本来启动的,而是直接由可执行程序启动,也就是说我们必须显示的启动线程。

下面是简单的多线程算矩阵*向量的代码。因为用math库编译gcc要加-lm嫌麻烦就写成了c++

#include <bits/stdc++.h>
#include <pthread.h>

#define maxn 110
using namespace std;
int thread_count;
int A[maxn][maxn], x[maxn], y[maxn];
int n, size;

void *Hello(void *rank);

void *Mul(void *rand);

int main() {
    long thread;
    pthread_t *thread_handles;
    scanf("%d", &thread_count);
//    thread_count = strtol(argv[1], NULL, 10);
    scanf("%d", &n);
    for (int i = 0; i < n; i++)
        for (int j = 0; j < n; j++)
            A[i][j] = rand() % 100;
    for (int i = 0; i < n; i++)x[i] = rand() % 100;
    size = ceil(1.0 * n / thread_count);
    thread_handles = static_cast<pthread_t *>(malloc(thread_count * sizeof(pthread_t)));


    for (thread = 0; thread < thread_count; thread++)
        pthread_create(&thread_handles[thread], NULL, Mul, (void *) thread);


    for (thread = 0; thread < thread_count; thread++)
        pthread_join(thread_handles[thread], NULL);
    for (int i = 0; i < n; i++)
        printf("%d ", y[i]);

    free(thread_handles);
    return 0;
}

void *Hello(void *rank) {
    long my_rank = (long) rank;
    printf("Hello from thread %ld of %d\n", my_rank, thread_count);
    return NULL;
}

void *Mul(void *rank) {
    long my_rank = (long) rank;
    int L = size * my_rank;
    int R = size * (my_rank + 1) - 1;
    for (int i = L; i <= R; i++)
        for (int j = 0; j < n; j++)
            y[i] += A[i][j] * x[j];
    return NULL;
}

下面是“答案莫名奇妙错误”的一个例子:

#include <bits/stdc++.h>
#include <pthread.h>

#define db double
#define maxn 110
using namespace std;
int thread_count, n, size;
db ans;


void *Cal(void *rand);

int main() {
    cout << fixed << setprecision(10);

    pthread_t *thread_handles;
    cin >> thread_count >> n;
    size = ceil(1.0 * n / thread_count);
    cout << size << endl;
    thread_handles = static_cast<pthread_t *>(malloc(thread_count * sizeof(pthread_t)));


    for (long thread = 0; thread < thread_count; thread++)
        pthread_create(&thread_handles[thread], NULL, Cal, (void *) thread);


    for (long thread = 0; thread < thread_count; thread++)
        pthread_join(thread_handles[thread], NULL);
    cout << ans * 4 << endl;
    free(thread_handles);
    return 0;
}


void *Cal(void *rank) {
    long my_rank = (long) rank;
    int L = size * my_rank;
    int R = size * (my_rank + 1) - 1;
    cout << L << " " << R << endl;
    db now;
    if (my_rank & 1)now = -1;
    else now = 1;
    for (int i = L; i <= R; i++, now = -now)
        ans += now / (2 * i + 1);
    return NULL;
}

这个就是书上的代码,自己实现了一下也能发现很明显的问题,开一个线程的时候问题不大,但是开多个的时候答案误差就很大。这是因为简单的ans+=now/(2*i+1)这句话在执行时分为好几步,多个线程同时操作就会出问题,导致答案错误,这是我们不能接受的。

解决方法有很多:

1:忙等待

#include <bits/stdc++.h>
#include <pthread.h>

#define ll long long
#define db double
#define maxn 110
using namespace std;
ll thread_count, n, size;
db ans;
int falg;


void *Cal(void *rand);

int main() {
    cout << fixed << setprecision(10);

    pthread_t *thread_handles;
    cin >> thread_count >> n;
    clock_t str, end;
    str = clock();
    size = ceil(1.0 * n / thread_count);
    thread_handles = static_cast<pthread_t *>(malloc(thread_count * sizeof(pthread_t)));


    for (long thread = 0; thread < thread_count; thread++)
        pthread_create(&thread_handles[thread], NULL, Cal, (void *) thread);


    for (long thread = 0; thread < thread_count; thread++) {
        pthread_join(thread_handles[thread], NULL);
    }
    cout << ans * 4 << endl;
    free(thread_handles);
    end = clock();
    cout << 1.0 * (end - str) / CLOCKS_PER_SEC << endl;
    return 0;
}


void *Cal(void *rank) {
    long my_rank = (long) rank;
    ll L = size * my_rank;
    ll R = size * (my_rank + 1) - 1;
    db now, sum = 0;
    if (my_rank & 1)now = -1;
    else now = 1;
    for (ll i = L; i <= R; i++, now = -now) {
        sum += now / (2 * i + 1);
    }
    while (falg != my_rank);
    ans += sum;
    falg = (falg + 1) % thread_count;
    return NULL;
}

2:互斥量

#include <stdlib.h>
#include <stdio.h>
#include <math.h>
#include <pthread.h>

#define ll long long
#define db double
#define maxn 110

ll thread_count, n, size;
db ans;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void *Cal(void *rand);

int main() {
    pthread_t *thread_handles;
    scanf("%lld%lld", &thread_count, &n);
    clock_t str, end;
    str = clock();
    size = (int) (1.0 * n / thread_count + 0.99);
    thread_handles = (malloc(thread_count * sizeof(pthread_t)));


    for (long thread = 0; thread < thread_count; thread++)
        pthread_create(&thread_handles[thread], NULL, Cal, (void *) thread);


    for (long thread = 0; thread < thread_count; thread++) {
        pthread_join(thread_handles[thread], NULL);
    }
    printf("%.10f\n", ans * 4);
    free(thread_handles);
    pthread_mutex_destroy(&mutex);
    end = clock();
    printf("time:%.5f\n", 1.0 * (end - str) / CLOCKS_PER_SEC);
    return 0;
}


void *Cal(void *rank) {
    long my_rank = (long) rank;
    ll L = size * my_rank;
    ll R = size * (my_rank + 1) - 1;
    db now, sum = 0;
    if (my_rank & 1)now = -1;
    else now = 1;
    for (ll i = L; i <= R; i++, now = -now) {
        sum += now / (2 * i + 1);
    }
    pthread_mutex_lock(&mutex);
    ans += sum;
    pthread_mutex_unlock(&mutex);
    return NULL;
}

通过这两种的比较,可以发现,如果开的线程很多,使自己机器实际拥有线程的好几倍了,忙等待就会变得很慢,而互斥量则不会。原因是,如果我们开了好多线程,因为本身的核数有限,那肯定有许多线程是等待的,并且,当某一个线程完事以后os选择新的线程的时候是随机的,而我们的falg变量是递增的,也就是说,os可能选择了10号线程,但是falg=5,所以10号一直等。就是变慢很多。