0. 条件变量

条件变量是pthread线程库为线程同步提供的另外一种机制。它允许线程在某条件没有到达的情况下投入睡眠,在条件到达之后被唤醒。条件变量通常与互斥锁配合起来使用。

1. 相关函数

1.1. 初始化

条件变量的类型是pthread_cond_t,它可以通过两种方式进行初始化。

  • 静态分配的条件变量可以通过常量PTHREAD_COND_INITIALIZER初始化, 如:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
  • 动态分配的条件变量必须通过下面两个函数进行初始化和销毁
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
int pthread_cond_destroy(pthread_cond_t *cond);

1.2. 发送信号

当条件到达时,我们可以通过下面两个函数唤醒等待队列中的线程。

// 唤醒等待队列中的一个线程
int pthread_cond_signal(pthread_cond_t *cond);

// 唤醒等待队列中的所有线程,并重新参与调度
int pthread_cond_broadcast(pthread_cond_t *cond);

1.3. wait函数

pthread_cond_wait是一个阻塞函数,当等待的条件不满足时,线程阻塞在此函数。wait函数将线程投入等待队列,同时将传入的mutex解锁,这两个步骤是原子的。当wait函数返回时,mutex再次被加锁。

int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);

2. 实例

下面我们来看一个利用条件变量来实现多线程同步,模拟文件下载过程的例子。在这个例子当中我们希望用多个线程(线程个数由用户输入)同时下载,用一个线程来检查下载进度,当下载完成,结束所有线程。

// download thread
void *do_download(void * arg)
{
    int i = 0;

    for (i = 0; ; i++) {
        pthread_mutex_lock(&qlock); // 加锁检查
        if (process_bar > 100) {
            pthread_mutex_unlock(&qlock);
            pthread_cond_signal(&cond); // 下载完成,发送信号
            return ((void*)0);
        }
        printf("downloading......%d%%\n", process_bar);
        process_bar++;
        pthread_mutex_unlock(&qlock);
        sleep(1); // 假设1s可以下载1%
    }
}

// check download status thread
void *check_download_status(void* arg)
{
    pthread_mutex_lock(&qlock);
    printf("is download complete?\n");
    while (process_bar < 100) {
        printf("%d%%......\n", process_bar);
        pthread_cond_wait(&cond, &qlock); // 等待下载完成信号
        printf("is download complete?\n");
    }
    printf("download success!\n");
    pthread_mutex_unlock(&qlock);
    return ((void*)0);
}

// main thread
int process_bar = 0;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;  // 条件变量初始化
pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER; // mutex初始化

int main(int argc, char* argv[])
{
    pthread_t   *tid;
    int         err, i, cnt;

    if (argc != 2) {
        printf("usage:./a.out threadnum(threadnum >= 2)\n");
        exit(0);
    }
    cnt = atoi(argv[1]);  // 线程个数由用户输入
    if (cnt < 2) {
        printf("thread num must be no less than 2\n");
        exit(0);
    }
    tid = (pthread_t*)malloc(sizeof(pthread_t) * cnt);
    for (i = 0; i < cnt - 1; i++) {  //开启cnt-1个线程同时下载
        err = pthread_create(&tid[i], NULL, do_download, NULL);
        if (err != 0) {
            printf("create thread error\n");
            exit(-1);
        }
    }
    // 开启检查线程
    err = pthread_create(&tid[cnt - 1], NULL, check_download_status, NULL); 
    if (err != 0) {
        printf("create thread error\n");
        exit(-1);
    }
    for (i = 0; i < cnt; i++) {
        pthread_join(tid[i], NULL);
    }
    free(tid);
    exit(0);
}

3. 运行结果

上面这个例程的运行效果是这样

is download complete?
0%......
downloading......0%
downloading......1%
...
downloading......99%
downloading......100%
is download complete?
download success!

如果只有1个下载线程,那么这个程序大概需要运行100s,如果开启100个下载线程,这个程序只需要运行1s多一点。想一下,如果开启1000个线程需要运行多久呢?

例程下载:https://github.com/zkangHUST/OperatingSystem/blob/master/thread/11.c

4. 参考资料

[1.] 《unix环境高级编程》第11章

[2.] Allen’s Blog https://allen.blog.csdn.net/article/details/61926511