使用应用级并发的应用程序称为并发程序(concurrent program)。现代操作系统提供3种基本的构造并发程序的方法:进程、I/O多路复用和线程。下面将分别予以讨论。

1. 基于进程的并发编程

我们可以利用熟悉的fork、execve及waitpid函数来开发基于进程的并发编程。下面以构造并发服务器为例,在服务器程序中,父进程接受客户端的连接请求,然后创建一个新的子进程为每一个客户端提供服务。

假设服务器正在监听一个监听描述符3上的连接请求,此时刚好有一个客户端1请求连接,所以服务器接受它的请求并返回一个已连接描述符4。在接受连接请求之后,服务器派生一个子进程,这个子进程获得服务器描述符表的完整副本。子进程关闭它副本中的监听描述符3,而父进程关闭它的已连接描述符4的副本,因为父子进程中的描述符都指向同一个文件表项,所以父进程关闭它的以连接描述符的副本是至关重要的。否则,当子进程退出要关闭已连接描述符时会失败,因为在父进程里仍有一个打开的已连接描述符。同样的情况也适用于监听描述符,所以在子进程里要及时的关闭监听描述符。

现在,假设有另一个客户端2发送来连接请求,服务器接受并返回了一个新的已连接描述符5。服务器又会派生一个子进程,这个子进程同样也会获得服务器描述符表的完整副本,它关闭了监听描述符并用已连接描述符5服务客户端,而服务器关闭已连接描述符,并继续等待新的连接请求。

此时服务器派生的两个子进程正在并发地服务于它们连接的客户端。如下图所示:

<img src="https://cdn.jsdelivr.net/gh/Leon1023/leon_pics/img/20200327231845.jpg" alt="微图" style="zoom: 33%;" />

下面给出源代码:

/* 
 * echoserverp.c - A concurrent echo server based on processes
 */
/* $begin echoserverpmain */
#include "csapp.h"
void echo(int connfd);

void sigchld_handler(int sig) //line:conc:echoserverp:handlerstart
{
    while (waitpid(-1, 0, WNOHANG) > 0)
    ;
    return;
} //line:conc:echoserverp:handlerend

int main(int argc, char **argv) 
{
    int listenfd, connfd;
    socklen_t clientlen;
    struct sockaddr_storage clientaddr;

    if (argc != 2) {
    fprintf(stderr, "usage: %s <port>\n", argv[0]);
    exit(0);
    }

    Signal(SIGCHLD, sigchld_handler);
    listenfd = Open_listenfd(argv[1]);
    while (1) {
    clientlen = sizeof(struct sockaddr_storage); 
    connfd = Accept(listenfd, (SA *) &clientaddr, &clientlen);
    if (Fork() == 0) { 
        Close(listenfd); /* Child closes its listening socket */
        echo(connfd);    /* Child services client */ //line:conc:echoserverp:echofun
        Close(connfd);   /* Child closes connection with client */ //line:conc:echoserverp:childclose
        exit(0);         /* Child exits */
    }
    Close(connfd); /* Parent closes connected socket (important!) */ //line:conc:echoserverp:parentclose
    }
}
/* $end echoserverpmain */
  • 首先,服务器会运行很长时间,所以必须要包括一个SIGCHLD处理程序来回收僵尸进程的资源。且当SIGCHLD处理程序执行时,SIGCHLD信号是被阻塞的,所以处理程序必须准备好回收多个僵死的进程资源。

  • 其次,父子进程必须关闭它们各自的connfd副本。尤其是父进程必须关闭它的已连接描述符,以避免内存泄漏。

  • 最后,因为套接字的文件表表项中的引用计数,直到父子进程的connfd都关闭了,到客户端的连接才会终止。

    利用进程的并发程序优点: 父子进程间共享文件表,但是不共享用户地址空间。这样它们之间就不会相互覆盖彼此的资源。

    利用进程的并发程序缺点: 独立的地址空间使得父子进程间共享状态信息更加困难,为此不得不使用开销很高的显示IPC(进程间通信),这样速度上不会很快。

2. 基于I/O多路复用的并发编程

服务器使用I/O多路复用,借助select函数检测输入事件的发生。当每个已连接描述符准备好读时,服务器就从描述符读和回写一个文本行。下图展示了一个完整的基于I/O多路复用的并发事件驱动服务器代码。

一个pool结构里维护着活动客户端的集合,在调用init_pool初始化池之后,服务器进入无限循环。在循环的每次迭代中,服务器调用select函数来检测两种不同类型的输入事件:a)来自一个新客户端的连接请求;b)一个已连接描述符准备好可以读了。

当一个连接请求到达时,服务器打开链接,并调用add_client函数将该客户端添加到池里。最后,服务器调用check_clients函数,把来自已准备好的已连接描述符的一个文本行回送回去。

/* 
 * echoservers.c - A concurrent echo server based on select
 */

/* $begin echoserversmain */
#include "csapp.h"

typedef struct { /* Represents a pool of connected descriptors */ 
    int maxfd;        /* Largest descriptor in read_set */   
    fd_set read_set;  /* Set of all active descriptors */
    fd_set ready_set; /* Subset of descriptors ready for reading  */
    int nready;       /* Number of ready descriptors from select */   
    int maxi;         /* Highwater index into client array */
    int clientfd[FD_SETSIZE];    /* Set of active descriptors */
    rio_t clientrio[FD_SETSIZE]; /* Set of active read buffers */
} pool; 

void init_pool(int listenfd, pool *p);
void add_client(int connfd, pool *p);
void check_clients(pool *p);

int byte_cnt = 0; /* Counts total bytes received by server */

int main(int argc, char **argv)
{
    int listenfd, connfd;
    socklen_t clientlen;
    struct sockaddr_storage clientaddr;
    static pool pool; 

    if (argc != 2) {
    fprintf(stderr, "usage: %s <port>\n", argv[0]);
    exit(0);
    }
    listenfd = Open_listenfd(argv[1]);
    init_pool(listenfd, &pool); //line:conc:echoservers:initpool

    while (1) {
    /* Wait for listening/connected descriptor(s) to become ready */
    pool.ready_set = pool.read_set;
    pool.nready = Select(pool.maxfd+1, &pool.ready_set, NULL, NULL, NULL);

    /* If listening descriptor ready, add new client to pool */
    if (FD_ISSET(listenfd, &pool.ready_set)) { //line:conc:echoservers:listenfdready
            clientlen = sizeof(struct sockaddr_storage);
        connfd = Accept(listenfd, (SA *)&clientaddr, &clientlen); //line:conc:echoservers:accept
        add_client(connfd, &pool); //line:conc:echoservers:addclient
    }
    
    /* Echo a text line from each ready connected descriptor */ 
    check_clients(&pool); //line:conc:echoservers:checkclients
    }
}
/* $end echoserversmain */
  • init_pool函数初始化客户端池。clientfd数组表示已连接描述符的集合,其中-1表示一个可用的槽位。开始时,已连接描述符集合是空的,而且监听描述符是select读集合中唯一的描述符。
/* $begin init_pool */
void init_pool(int listenfd, pool *p) 
{
    /* Initially, there are no connected descriptors */
    int i;
    p->maxi = -1;                   //line:conc:echoservers:beginempty
    for (i=0; i< FD_SETSIZE; i++)  
    p->clientfd[i] = -1;        //line:conc:echoservers:endempty

    /* Initially, listenfd is only member of select read set */
    p->maxfd = listenfd;            //line:conc:echoservers:begininit
    FD_ZERO(&p->read_set);
    FD_SET(listenfd, &p->read_set); //line:conc:echoservers:endinit
}
/* $end init_pool */
  • add_client函数添加一个新客户端到活动客户端池中。在clientfd数组中找到一个空槽位后,服务器将这个已连接描述符添加到数组中,并初始化相应的RIO读缓冲区。然后将这个已连接描述符添加到select读集合,并更新该池(maxfd、maxi、)。
/* $begin add_client */
void add_client(int connfd, pool *p) 
{
    int i;
    p->nready--;
    for (i = 0; i < FD_SETSIZE; i++)  /* Find an available slot */
    if (p->clientfd[i] < 0) { 
        /* Add connected descriptor to the pool */
        p->clientfd[i] = connfd;                 //line:conc:echoservers:beginaddclient
        Rio_readinitb(&p->clientrio[i], connfd); //line:conc:echoservers:endaddclient

        /* Add the descriptor to descriptor set */
        FD_SET(connfd, &p->read_set); //line:conc:echoservers:addconnfd

        /* Update max descriptor and pool highwater mark */
        if (connfd > p->maxfd) //line:conc:echoservers:beginmaxfd
        p->maxfd = connfd; //line:conc:echoservers:endmaxfd
        if (i > p->maxi)       //line:conc:echoservers:beginmaxi
        p->maxi = i;       //line:conc:echoservers:endmaxi
        break;
    }
    if (i == FD_SETSIZE) /* Couldn't find an empty slot */
    app_error("add_client error: Too many clients");
}
/* $end add_client */
  • check_clients函数回送文本行。如果客户端关闭了连接,服务器器这端回检测到EOF,然后服务器也关闭连接,并从池中清除掉这个描述符。
/* $begin check_clients */
void check_clients(pool *p) 
{
    int i, connfd, n;
    char buf[MAXLINE]; 
    rio_t rio;

    for (i = 0; (i <= p->maxi) && (p->nready > 0); i++) {
    connfd = p->clientfd[i];
    rio = p->clientrio[i];

    /* If the descriptor is ready, echo a text line from it */
    if ((connfd > 0) && (FD_ISSET(connfd, &p->ready_set))) { 
        p->nready--;
        if ((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0) {
        byte_cnt += n; //line:conc:echoservers:beginecho
        printf("Server received %d (%d total) bytes on fd %d\n", 
               n, byte_cnt, connfd);
        Rio_writen(connfd, buf, n); //line:conc:echoservers:endecho
        }

        /* EOF detected, remove descriptor from pool */
        else { 
        Close(connfd); //line:conc:echoservers:closeconnfd
        FD_CLR(connfd, &p->read_set); //line:conc:echoservers:beginremove
        p->clientfd[i] = -1;          //line:conc:echoservers:endremove
        }
    }
    }
}
/* $end check_clients */
  • 优点:
  • 因为在单一进程中执行,所以不需要频繁的进程间上下文切换,进而速度较快;
  • 每个逻辑流共享进程的全部地址空间,共享数据较为简单;
  • 单一进程可以方便的利用GDB调试工具。
  • 缺点:
  • 这种事件驱动的程序比起基于进程的程序编码上相对复杂;
  • 如果运行在多核处理器上不能充分利用处理器的并发优势。

3. 基于线程的并发编程

在基于进程的并发编程方法中,我们为每个流使用了单独的进程,内核会自动调度每个进程,而每个进程有自己的独立地址空间,使的共享数据变得困难。在基于I/O调度的并发编程方法中,我们创建自己的逻辑流,利用I/O多路复用来显示地调度流,因为只有一个进程,所有的流共享整个地址空间。

线程(thread)就是运行在上下文中的逻辑流。现代系统运行里允许一个进程里同时运行多个线程,线程由内核自动调度。每个线程有自己的线程上下文(线程ID、栈、栈指针、程序计数器、通用目的寄存器和条件码),所有的线程共享进程的整个虚拟地址空间。

基于线程的并发编程结合了前两种方法的优势,各个线程由内核自动调度,且线程间又共享地址空间里的代码、数据、堆、共享库和打开的文件。

3.1创建线程

线程通过调用pthread_create函数来创建其它线程:

#include <pthread.h>
typedef void *(func)(void * arg);

int pthread_create(pthread_t *tid, pthread_attr_t *attr,
                   func *f, void *arg);
                        //成功返回0,出错非0
  • tid: 存储函数创建的线程ID;
  • attr: 新创建线程的属性变量,默认为NULL;
  • f: 线程例程;
  • arg: 线程的输入变量,是创建线程和被创建线程间传递数据的一个途径。

线程可以调用下面的函数可以获得自己的线程ID:

#include <pthread.h>
pthread_t pthread_self(void);

3.2 终止线程

一个线程总是通过以下4种方式之一来终止的:

  1. 当顶层的线程例程返回时,线程会隐式地终止;
  2. 通过调用pthread_exit函数,线程会显示地终止。如果是主线程(每个进程开始生命周期时都是单一线程,这个线程就是“主线程”,主线程调用线程创建函数得到的线程称为“对等线程”)调用的这个函数,那么它会等待所有其它对等线程终止,然后再终止主线程和整个进程,返回值为thread_return
#include <pthread.h>
void pthread_exit(void *thread_return);
                    //从不返回
  1. 某个对等线程调用exit函数,它会终止该进程及其所有线程。
  2. 某个对等线程通过一个“对等线程ID”作为参数调用pthread_cancel函数来终止这个线程。
#include <pthread.h>
int pthread_cancel(pthread_t tid);
                    //成功返回0,出错为非0

3.3 回收线程资源

线程调用pthread_join函数等待其它线程终止。

#include <pthread.h>
int pthread_join(pthread_t tid, void **thread_return);
                    //成功返回0,出错为非0

该函数会阻塞等待线程tid终止,并可以将线程例程返回的通用指针赋值到thread_return指向的位置,然后回收已终止线程占用的所有内存资源。注意:和wait函数不同的是该函数只等待tid指定的单一线程,而不能等待任意一个线程终止。

3.4 分离线程

线程有两种属性:可结合的(joinable)和分离的(detached)。前者是默认的线程属性,它能够被其它线程收回和杀死,在被回收前它的内存资源是不释放的;后者通过函数pthread_detach来设置,此时的线程被称为“分离的线程”,它是不能其它线程回收或杀死的,它的内存资源在其终止时由系统自动回收。

为了避免内存泄漏,每个可结合线程都应该要么被其它线程显示收回,要么通过调用pthread_detach函数分离。

#include <pthread.h>
int pthread_detach(pthread_t pid);
                    //成功返回0,出错为非0

线程可以通过调用pthread_detach(pthread_self());来分离自己。

3.5 初始化线程

pthread_once函数允许初始化与线程例程相关的状态。

#include <pthread.h>
pthread_once_t once_control = PTHREAD_ONCE_INIT;
int pthread_once(pthread_once_t * &once_control,
                void (*init_routine)(void));

once_control是一个全局变量,它总是被初始化为PTHREAD_ONCE_INIT。当第一次调用函数时,它会调用init_routine函数,它是一个没有返回值和输入值的函数。再接下来对pthread_once的调用不做任何事。一般应用于需要动态初始化多个线程共享的全局变量时。

3.6 基于线程的并发服务器

下面思考如何编写一个基于线程的并发服务器的代码,假设由主线程循环等待客户端连接,一旦请求到来就创建一个对等线程处理该请求,而后返回继续等待其它客户端的连接请求。

  • 主线程如何将已连接描述符传递给对等线程的?

    首先,主线程调用Accept获得已连接描述符connfd;

    其次,主线程再通过已连接描述符的地址作为参数调用线程创建函数Pthread_creat

    最后,让对等线程间接引用这个指针,并将它赋值给一个局部变量。

connfd = Accept(listenfd, (SA *)&clientaddr, &clientlen);
Pthread_creat(&tid, NULL, thread, &connfd);

void *thread(void *vargp){
    int connfd = *((int *)vargp);
    ...
}

注意: 该思路貌似对的,但如此一来会在Accept函数和赋值语句connfd = *((int *)vargp);之间引起竞争。因为若在某一次连接请求到来,Accept返回一个已连接描述符,接着就调用线程创建函数创建新的线程来处理该连接,之后等待下一次连接请求。假设这次线程中的赋值语句还没开始执行前,一个连接请求又到来了,且Accept又再次返回了一个已连接描述符存进了connfd变量中,此时,线程中的赋值语句就会得到一个错误的描述符。

  • 为了解决这个问题,我们必须将accept返回的每个已连接描述符分配到一个动态分配的内存块中去,正如下面代码的第25和26行所示:
/* 
 * echoservert.c - A concurrent echo server using threads
 */
/* $begin echoservertmain */
#include "csapp.h"

void echo(int connfd);
void *thread(void *vargp);

int main(int argc, char **argv) 
{
    int listenfd, *connfdp;
    socklen_t clientlen;
    struct sockaddr_storage clientaddr;
    pthread_t tid; 

    if (argc != 2) {
    fprintf(stderr, "usage: %s <port>\n", argv[0]);
    exit(0);
    }
    listenfd = Open_listenfd(argv[1]);

    while (1) {
        clientlen=sizeof(struct sockaddr_storage);
        connfdp = Malloc(sizeof(int)); 
        *connfdp = Accept(listenfd, (SA *) &clientaddr, &clientlen); 
        Pthread_create(&tid, NULL, thread, connfdp);
    }
}

/* Thread routine */
void *thread(void *vargp) 
{  
    int connfd = *((int *)vargp);
    Pthread_detach(pthread_self()); //line:conc:echoservert:detach
    Free(vargp);                    //line:conc:echoservert:free
    echo(connfd);
    Close(connfd);
    return NULL;
}
/* $end echoservertmain */

获取更多知识,请点击关注:
嵌入式Linux&ARM
CSDN博客
简书博客
知乎专栏