一个基于 NATS 消息系统的项目,多个消息的提供者 (pub)和订阅者(sub)都连到 NATS 消息系统,通过这个系统来完成消息的投递和订阅处理。
突然有一天,线上报了一个故障,一个流程不能正常处理。经排查,发现消息正确地投递到了 NATS 服务端,但是消息订阅者没有收到该消息,也没能做出处理,导致流程没能进行下去。
通过观察消息订阅者后发现,消息订阅者到 NATS 服务端的连接虽然显示是“正常”的,但实际上,这个连接已经是无效的了。
为什么呢?这是因为 NATS 服务器崩溃过,NATS 服务器和消息订阅者之间的连接中断 FIN 包,由于异常情况,没能够正常到达消息订阅者,这样造成的结果就是消息订阅者一直维护着一个“过时的”连接,不会收到 NATS 服务器发送来的消息
这个故障的根本原因在于,作为 NATS 服务器的客户端,消息订阅者没有及时对连接的有效性进行检测,这样就造成了问题。
保持对连接有效性的检测,是在实战中必须要注意的一个点。

TCP Keep-Alive 选项

在没有数据读写的“静默”的连接上,是没有办法发现 TCP 连接是有效还是无效的。比如客户端突然崩溃,服务器端可能在几天内都维护着一个无用的 TCP 连接。前面提到的例子就是这样的一个场景。
定义一个时间段,在这个时间段内,如果没有任何连接相关的活动,TCP 保活机制会开始作用,每隔一个时间间隔,发送一个探测报文,该探测报文包含的数据非常少,如果连续几个探测报文都没有得到响应,则认为当前的 TCP 连接已经死亡,系统内核将错误信息通知给上层应用程序
保活时间保活时间间隔保活探测次数。在 Linux 系统中,这些变量分别对应 sysctl 变量net.ipv4.tcp_keepalive_timenet.ipv4.tcp_keepalive_intvl net.ipv4.tcp_keepalve_probes。默认设置是 7200 秒(2小时)、75秒9次探测

如果开启了 TCP 保活,需要考虑以下几种情况:
  1. 对端程序是正常工作的。当 TCP 保活的探测报文发送给对端, 对端会正常响应,这样 TCP 保活时间会被重置,等待下一个 TCP 保活时间的到来。
  2. 对端程序崩溃并重启。当 TCP 保活的探测报文发送给对端后,对端是可以响应的,但由于没有该连接的有效信息,会产生一个 RST 报文,这样很快就会发现 TCP 连接已经被重置。
  3. 对端程序崩溃,或对端由于其他原因导致报文不可达。当 TCP 保活的探测报文发送给对端后,石沉大海,没有响应,连续几次,达到保活探测次数后,TCP 会报告该 TCP 连接已经死亡。
TCP 保活机制默认是关闭的,当我们选择打开时,可以分别在连接的两个方向上开启,也可以单独在一个方向上开启
  • 开启服务器端到客户端的检测:就可以在客户端非正常断连的情况下 清除在服务器端保留的“脏数据”;
  • 开启客户端到服务器端的检测:就可以在服务器无响应的情况下,重新发起连接。

应用层探活

如果使用 TCP 自身的 keep-Alive 机制,在 Linux 系统中,最少需要经过 2 小时 11 分 15 秒才可以发现一个“死亡”连接。这个时间是怎么计算出来的呢?其实是通过 2 小时,加上 75 秒乘以 9 的总和。实际上,对很多对时延要求敏感的系统中,这个时间间隔是不可接受的。
可以通过在应用程序中模拟 TCP Keep-Alive 机制,来完成在应用层的连接探活。
可以设计一个 PING-PONG 的机制,需要保活的一方,比如客户端,在保活时间达到后,发起对连接的 PING 操作,如果服务器端对 PING 操作有回应,则重新设置保活时间,否则对探测次数进行计数,如果最终探测次数达到了保活探测次数预先设置的值之后,则认为连接已经无效。

两个比较关键的点:
  1. 需要使用定时器,这可以通过使用 I/O 复用自身的机制来实现;
  2. 需要设计一个 PING-PONG 的协议

消息格式设计

这里的程序是客户端来发起保活,为此定义了一个消息对象。这个消息对象是一个结构体,前 4 个字节标识了消息类型,为了简单,这里设计了 MSG_PING、MSG_PONG、MSG_TYPE1、MSG_TYPE2 四种消息类型。
typedef struct {
    u_int32_t type;
    char data[1024];
} messageObject;

#define MSG_PING          1
#define MSG_PONG          2
#define MSG_TYPE1        11
#define MSG_TYPE2        21

客户端程序设计

客户端完全模拟 TCP Keep-Alive 的机制,在保活时间达到后,探活次数增加 1,同时向服务器端发送 PING 格式的消息,此后以预设的保活时间间隔,不断地向服务器端发送 PING 格式的消息。如果能收到服务器端的应答,则结束保活,将保活时间置为 0。
#include "lib/common.h"
#include "message_objecte.h"

#define    MAXLINE     4096
#define    KEEP_ALIVE_TIME  10
#define    KEEP_ALIVE_INTERVAL  3
#define    KEEP_ALIVE_PROBETIMES  3


int main(int argc, char **argv) {
    if (argc != 2) {
        error(1, 0, "usage: tcpclient <IPaddress>");
    }

    int socket_fd;
    socket_fd = socket(AF_INET, SOCK_STREAM, 0);

    struct sockaddr_in server_addr;
    bzero(&server_addr, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(SERV_PORT);
    inet_pton(AF_INET, argv[1], &server_addr.sin_addr);

    socklen_t server_len = sizeof(server_addr);
    int connect_rt = connect(socket_fd, (struct sockaddr *) &server_addr, server_len);
    if (connect_rt < 0) {
        error(1, errno, "connect failed ");
    }

    char recv_line[MAXLINE + 1];
    int n;

    fd_set readmask;
    fd_set allreads;

    struct timeval tv;
    int heartbeats = 0;

    tv.tv_sec = KEEP_ALIVE_TIME;
    tv.tv_usec = 0;

    messageObject messageObject;

    FD_ZERO(&allreads);
    FD_SET(socket_fd, &allreads);
    for (;;) {
        readmask = allreads;
        int rc = select(socket_fd + 1, &readmask, NULL, NULL, &tv);
        if (rc < 0) {
            error(1, errno, "select failed");
        }
        if (rc == 0) {
            if (++heartbeats > KEEP_ALIVE_PROBETIMES) {
                error(1, 0, "connection dead\n");
            }
            printf("sending heartbeat #%d\n", heartbeats);

            messageObject.type = htonl(MSG_PING);
            rc = send(socket_fd, (char *) &messageObject, sizeof(messageObject), 0);
            if (rc < 0) {
                error(1, errno, "send failure");
            }

            tv.tv_sec = KEEP_ALIVE_INTERVAL;
            continue;
        }
        if (FD_ISSET(socket_fd, &readmask)) {
            n = read(socket_fd, recv_line, MAXLINE);
            if (n < 0) {
                error(1, errno, "read error");
            } else if (n == 0) {
                error(1, 0, "server terminated \n");
            }
            printf("received heartbeat, make heartbeats to 0 \n");

            heartbeats = 0;
            tv.tv_sec = KEEP_ALIVE_TIME;
        }
    }
}
第一部分为 套接字的创建和连接建立:
15-16 行:创建了 TCP 套接字;
18-22 行:创建了 IPv4 目标地址,其实就是服务器端地址,注意这里使用的是传入参数作为服务器地址;
24-28 行:向服务器端发起连接。
第二部分为 select 定时器准备:
39-40 行:设置了超时时间为 KEEP_ALIVE_TIME,这相当于保活时间;
44-45 行:初始化 select 函数的套接字;
第三部分为 处理心跳报文:
48 行:     调用 select 函数,感知 I/O 事件。这里的 I/O 事件,除了套接字上的读操作之外,还有在 39-40 行设置的超时事件。当 KEEP_ALIVE_TIME 这段时间到达之后,select 函数会返回 0,于是进入 53-63 行的处理;
53-63 行:客户端已经在 KEEP_ALIVE_TIME 这段时间内没有收到任何对当前连接的反馈,于是发起 PING 消息,尝试问服务器端:“喂,你还活着吗?”这里通过传送一个类型为 MSG_PING 的消息对象来完成 PING 操作,之后会看到服务器端程序如何响应这个 PING 操作;
65-74 行:客户端在接收到服务器端程序之后的处理。为了简单,这里就没有再进行报文格式的转换和分析。这里认为既然收到服务器端的报文,那么连接就是正常的,所以会对探活计数器和探活时间都置零,等待下一次探活时间的来临。(在实际的工作中,这里其实是需要对报文进行解析后处理的,只有是 PONG 类型的回应,我们才认为是 PING 探活的结果。)

服务器端程序设计

服务器端的程序接受一个参数,这个参数设置的比较大,可以模拟连接没有响应的情况(实际情况下,应该是系统崩溃,或者网络异常)。服务器端程序在接收到客户端发送来的各种消息后,进行处理,其中如果发现是 PING 类型的消息,在休眠一段时间后回复一个 PONG 消息,告诉客户端:“嗯,我还活着。
#include "lib/common.h"
#include "message_objecte.h"

static int count;

int main(int argc, char **argv) {
    if (argc != 2) {
        error(1, 0, "usage: tcpsever <sleepingtime>");
    }

    int sleepingTime = atoi(argv[1]);

    int listenfd;
    listenfd = socket(AF_INET, SOCK_STREAM, 0);

    struct sockaddr_in server_addr;
    bzero(&server_addr, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(SERV_PORT);

    int rt1 = bind(listenfd, (struct sockaddr *) &server_addr, sizeof(server_addr));
    if (rt1 < 0) {
        error(1, errno, "bind failed ");
    }

    int rt2 = listen(listenfd, LISTENQ);
    if (rt2 < 0) {
        error(1, errno, "listen failed ");
    }

    int connfd;
    struct sockaddr_in client_addr;
    socklen_t client_len = sizeof(client_addr);

    if ((connfd = accept(listenfd, (struct sockaddr *) &client_addr, &client_len)) < 0) {
        error(1, errno, "bind failed ");
    }

    messageObject message;
    count = 0;

    for (;;) {
        int n = read(connfd, (char *) &message, sizeof(messageObject));
        if (n < 0) {
            error(1, errno, "error read");
        } else if (n == 0) {
            error(1, 0, "client closed \n");
        }

        printf("received %d bytes\n", n);
        count++;

        switch (ntohl(message.type)) {
            case MSG_TYPE1 :
                printf("process  MSG_TYPE1 \n");
                break;

            case MSG_TYPE2 :
                printf("process  MSG_TYPE2 \n");
                break;

            case MSG_PING: {
                messageObject pong_message;
                pong_message.type = MSG_PONG;

                sleep(sleepingTime);

                ssize_t rc = send(connfd, (char *) &pong_message, sizeof(pong_message), 0);
                if (rc < 0)
                    error(1, errno, "send failure");
                break;
            }

            default :
                error(1, 0, "unknown message type (%d)\n", ntohl(message.type));
        }
    }
}
第一部分为 监听过程的建立:
13-14 行:先创建一个本地 TCP 监听套接字;
16-20 行:绑定该套接字到本地端口和 ANY 地址上;
27-38 行:分别调用 listen 和 accept 完成被动套接字转换和监听;
第二部分为 从建立的连接套接字上读取数据,解析报文,根据消息类型进行不同的处理。
55-57 行:处理 MSG_TYPE1 的消息;
59-61 行:处理 MSG_TYPE2 的消息;
64-72 行:处理 MSG_PING 类型的消息。通过休眠来模拟响应是否及时,然后调用 send 函数发送一个 PONG 报文,向客户端表示“还活着”的意思;
74 行:     异常处理,因为消息格式不认识,所以程序出错退出。

实验

(1) 服务器端休眠时间为 60 秒。
$./pingclient 127.0.0.1
sending heartbeat #1
sending heartbeat #2
sending heartbeat #3
connection dead
$./pingserver 60
received 1028 bytes
received 1028 bytes
客户端在发送了三次心跳检测报文 PING 报文后,判断出连接无效,直接退出了。之所以造成这样的结果,是因为在这段时间内没有接收到来自服务器端的任何 PONG 报文。(当然,实际工作的程序,可能需要不一样的处理,比如重新发起连接)

(2) 服务器端休眠时间为 5 秒。
$./pingclient 127.0.0.1
sending heartbeat #1
sending heartbeat #2
received heartbeat, make heartbeats to 0
received heartbeat, make heartbeats to 0
sending heartbeat #1
sending heartbeat #2
received heartbeat, make heartbeats to 0
received heartbeat, make heartbeats to 0
$./pingserver 5
received 1028 bytes
received 1028 bytes
received 1028 bytes
received 1028 bytes
由于这一次服务器端在心跳检测过程中,及时地进行了响应,客户端一直都会认为连接是正常的。

总结

虽然 TCP 没有提供系统的保活能力,让应用程序可以方便地感知连接的存活,但是,可以在应用程序里灵活地建立这种机制。
一般来说,这种机制的建立依赖于系统定时器,以及恰当的应用层报文协议。(比如,使用心跳包就是这样一种保持 Keep Alive 的机制)

思考题

1. 针对 TCP 的探活,是否同样适用于 UDP 呢?
UDP是无连接的,如果为了连接而保活是不必要的,如果为了探测对端是否正常工作而做ping-pong也是可行的。
2. 有人说额外的探活报文占用了有限的带宽,对此你是怎么想的呢?
额外的探活报文是会占用一些带宽资源,可根据实际业务场景,适当增加保活时间,降低探活频率,简化ping-pong协议。
3. 为什么需要多次探活才能决定一个 TCP 连接是否已经死亡呢?
多次探活是为了 避免因ping包在网络中意外丢失,而误认为对端死亡。