专栏名称: 马哥Linux运维
马哥linux致力于linux运维培训,连续多年排名第一,订阅者可免费获得学习机会和相关Linux独家实战资料!
目录
相关文章推荐
51好读  ›  专栏  ›  马哥Linux运维

Linux网络编程“惊群”问题总结

马哥Linux运维  · 公众号  · 运维  · 2017-06-25 08:01

正文


1、前言

我从事Linux系统下网络开发将近4年了,经常还是遇到一些问题,只是知其然而不知其所以然,有时候和其他人交流,搞得非常尴尬。如今计算机都是多核了,网络编程框架也逐步丰富多了,我所知道的有多进程、多线程、异步事件驱动常用的三种模型。最经典的模型就是Nginx中所用的Master-Worker多进程异步驱动模型。今天和大家一起讨论一下网络开发中遇到的“惊群”现象。之前只是听说过这个现象,网上查资料也了解了基本概念,在实际的工作中还真没有遇到过。今天周末,结合自己的理解和网上的资料,彻底将“惊群”弄明白。需要弄清楚如下几个问题:

(1)什么是“惊群”,会产生什么问题?

(2)“惊群”的现象怎么用代码模拟出来?

(3)如何处理“惊群”问题,处理“惊群”后的现象又是怎么样呢?

2、何为惊群

如今网络编程中经常用到多进程或多线程模型,大概的思路是父进程创建socket,bind、listen后,通过fork创建多个子进程,每个子进程继承了父进程的socket,调用accpet开始监听等待网络连接。这个时候有多个进程同时等待网络的连接事件,当这个事件发生时,这些进程被同时唤醒,就是“惊群”。这样会导致什么问题呢?我们知道进程被唤醒,需要进行内核重新调度,这样每个进程同时去响应这一个事件,而最终只有一个进程能处理事件成功,其他的进程在处理该事件失败后重新休眠或其他。网络模型如下图所示:

简而言之,惊群现象(thundering herd)就是当多个进程和线程在同时阻塞等待同一个事件时,如果这个事件发生,会唤醒所有的进程,但最终只可能有一个进程/线程对该事件进行处理,其他进程/线程会在失败后重新休眠,这种性能浪费就是惊群。

3、编码模拟“惊群”现象

我们已经知道了“惊群”是怎么回事,那么就按照上面的图编码实现看一下效果。我尝试使用多进程模型,创建一个父进程绑定一个端口监听socket,然后fork出多个子进程,子进程们开始循环处理(比如accept)这个socket。测试代码如下所示:

 1 #include

 2 #include

 3 #include  

 4 #include  

 5 #include  

 6 #include  

 7 #include  

 8 #include

 9 #include

10 #include

11 

12 #define IP   "127.0.0.1"

13 #define PORT  8888

14 #define WORKER 4

15 

16 int worker(int listenfd, int i)

17 {

18     while (1) {

19         printf("I am worker %d, begin to accept connection.\n", i);

20         struct sockaddr_in client_addr;  

21         socklen_t client_addrlen = sizeof( client_addr );  

22         int connfd = accept( listenfd, ( struct sockaddr* )&client_addr, &client_addrlen );  

23         if (connfd != -1) {

24             printf("worker %d accept a connection success.\t", i);

25             printf("ip :%s\t",inet_ntoa(client_addr.sin_addr));

26             printf("port: %d \n",client_addr.sin_port);

27         } else {

28             printf("worker %d accept a connection failed,error:%s", i, strerror(errno));

         close(connfd);

29         }

30     }

31     return 0;

32 }

33 

34 int main()

35 {

36     int i = 0;

37     struct sockaddr_in address;  

38     bzero(&address, sizeof(address));  

39     address.sin_family = AF_INET;  

40     inet_pton( AF_INET, IP, &address.sin_addr);  

41     address.sin_port = htons(PORT);  

42     int listenfd = socket(PF_INET, SOCK_STREAM, 0);  

43     assert(listenfd >= 0);  

44 

45     int ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));  

46     assert(ret != -1);  

47 

48     ret = listen(listenfd, 5);  

49     assert(ret != -1);  

50 

51     for (i = 0; i < WORKER; i++) {

52         printf("Create worker %d\n", i+1);

53         pid_t pid = fork();

54         /*child  process */

55         if (pid == 0) {

56             worker(listenfd, i);

57         }

58 

59         if (pid < 0) {

60             printf("fork error");

61         }

62     }

63 

64     /*wait child process*/

65     int status;

66     wait(&status);

67     return 0;

68 }

编译执行,在本机上使用telnet 127.0.0.1 8888测试,结果如下所示:

按照“惊群"现象,期望结果应该是4个子进程都会accpet到请求,其中只有一个成功,另外三个失败的情况。而实际的结果显示,父进程开始创建4个子进程,每个子进程开始等待accept连接。当telnet连接来的时候,只有worker2 子进程accpet到请求,而其他的三个进程并没有接收到请求。

这是什么原因呢?难道惊群现象是假的吗?于是赶紧google查一下,惊群到底是怎么出现的。

其实在Linux2.6版本以后,内核内核已经解决了accept()函数的“惊群”问题,大概的处理方式就是,当内核接收到一个客户连接后, 只会唤醒等待队列上的第一个进程或线程 。所以,如果服务器采用accept阻塞调用方式,在最新的Linux系统上,已经没有“惊群”的问题了。

但是,对于实际工程中常见的服务器程序,大都使用select、poll或epoll机制,此时,服务器不是阻塞在accept,而是阻塞在select、poll或epoll_wait,这种情况下的“惊群”仍然需要考虑。接下来以epoll为例分析:

使用epoll非阻塞实现代码如下所示:

  1 #include

  2 #include

  3 #include

  4 #include

  5 #include

  6 #include

  7 #include

  8 #include

  9 #include

 10 #include

 11 #include

 12 #include

 13 

 14 #define IP   "127.0.0.1"

 15 #define PORT  8888

 16 #define PROCESS_NUM 4

 17 #define MAXEVENTS 64

 18 

 19 static int create_and_bind ()

 20 {

 21     int fd = socket(PF_INET, SOCK_STREAM, 0);

 22     struct sockaddr_in serveraddr;

 23     serveraddr.sin_family = AF_INET;

 24     inet_pton( AF_INET, IP, &serveraddr.sin_addr);  

 25     serveraddr.sin_port = htons(PORT);

 26     bind(fd, (struct sockaddr*)&serveraddr, sizeof(serveraddr));

 27     return fd;

 28 }

 29 

 30 static int make_socket_non_blocking (int sfd)

 31 {

 32     int flags, s;

 33     flags = fcntl (sfd, F_GETFL, 0);

 34     if (flags == -1) {

 35         perror ("fcntl");

 36         return -1;

 37     }

 38     flags |= O_NONBLOCK;

 39     s = fcntl (sfd, F_SETFL, flags);

 40     if (s == -1) {

 41         perror ("fcntl");

 42         return -1;

 43     }

 44     return 0;

 45 }

 46 

 47 void worker(int sfd, int efd, struct epoll_event *events, int k) {

 48     /* The event loop */

 49     while (1) {

 50         int n, i;

 51         n = epoll_wait(efd, events, MAXEVENTS, -1);

 52         printf("worker  %d return from epoll_wait!\n", k);

 53         for (i = 0; i < n; i++) {

 54             if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events &EPOLLIN))) {

 55                 /* An error has occured on this fd, or the socket is not ready for reading (why were we notified then?) */

 56                 fprintf (stderr, "epoll error\n");

 57                 close (events[i].data.fd);

 58                 continue;

 59             } else if (sfd == events[i].data.fd) {

 60                 /* We have a notification on the listening socket, which means one or more incoming connections. */

 61                 struct sockaddr in_addr;

 62                 socklen_t in_len;

 63                 int infd;

 64                 char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];

 65                 in_len = sizeof in_addr;

 66                 infd = accept(sfd, ∈_addr, ∈_len);

 67                 if (infd == -1) {

 68                     printf("worker %d accept failed!\n", k);

 69                     break;

 70                 }

 71                 printf("worker %d accept successed!\n", k);

 72                 /* Make the incoming socket non-blocking and add it to the list of fds to monitor. */

 73                 close(infd); 

 74             }

 75         }

 76     }

 77 }

 78 

 79 int main (int argc, char *argv[])

 80 {

 81     int sfd, s;

 82     int efd;

 83     struct epoll_event event;

 84     struct epoll_event *events;

 85     sfd = create_and_bind();

 86     if (sfd == -1) {

 87         abort ();

 88     }

 89     s = make_socket_non_blocking (sfd);

 90     if (s == -1) {

 91         abort ();

 92     }

 93     s = listen(sfd, SOMAXCONN);

 94     if (s == -1) {

 95         perror ("listen");

 96         abort ();

 97     }

 98     efd = epoll_create(MAXEVENTS);

 99     if (efd == -1) {

100         perror("epoll_create");

101         abort();

102     }

103     event.data.fd = sfd;

104     event.events = EPOLLIN;

105     s = epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &event);

106     if (s == -1) {

107         perror("epoll_ctl");

108         abort();

109     }

110 

111     /* Buffer where events are returned */

112     events = calloc(MAXEVENTS, sizeof event);

113     int k;

114     for(k = 0; k < PROCESS_NUM; k++) {

115         printf("Create worker %d\n", k+1);

116         int pid = fork();

117         if(pid == 0) {

118             worker(sfd, efd, events, k);

119         }

120     }

121     int status;

122     wait(&status);

123     free (events);

124     close (sfd);

125     return EXIT_SUCCESS;

126 }

父进程中创建套接字,并设置为非阻塞,开始listen。然后fork出4个子进程,在worker中调用epoll_wait开始accpet连接。使用telnet测试结果如下:

从结果看出,与上面是一样的,只有一个进程接收到连接,其他三个没有收到,说明没有发生惊群现象。这又是为什么呢?

在早期的Linux版本中,内核对于阻塞在epoll_wait的进程,也是采用全部唤醒的机制,所以存在和accept相似的“惊群”问题。新版本的的解决方案也是 只会唤醒等待队列上的第一个进程或线程 ,所以,新版本Linux  部分的 解决了epoll的“惊群”问题。所谓 部分的 解决,意思就是:对于部分特殊场景,使用epoll机制,已经不存在“惊群”的问题了,但是对于大多数场景,epoll机制仍然存在“惊群”。

epoll存在惊群的场景如下:在worker保持工作的状态下,都会被唤醒,例如在epoll_wait后调用sleep一次。改写woker函数如下:

void worker(int sfd, int efd, struct epoll_event *events, int k) {

    /* The event loop */

    while (1) {

        int n, i;

        n = epoll_wait(efd, events, MAXEVENTS, -1);

        /*keep running*/

        sleep(2);

        printf("worker  %d return from epoll_wait!\n", k); 

        for (i = 0; i < n; i++) {

            if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events &EPOLLIN))) {

                /* An error has occured on this fd, or the socket is not ready for reading (why were we notified then?) */

                fprintf (stderr, "epoll error\n");

                close (events[i].data.fd);

                continue;

            } else if (sfd == events[i].data.fd) {

                /* We have a notification on the listening socket, which means one or more incoming connections. */

                struct sockaddr in_addr;

                socklen_t in_len;

                int infd;

                char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];

                in_len = sizeof in_addr;

                infd = accept(sfd, ∈_addr, ∈_len);

                if (infd == -1) {

                    printf("worker %d accept failed,error:%s\n", k, strerror(errno));

                    break;

                }   

                printf("worker %d accept successed!\n", k); 

                /* Make the incoming socket non-blocking and add it to the list of fds to monitor. */

                close(infd); 

            }   

        }   

    }   

}


测试结果如下所示:

终于看到惊群现象的出现了。

4、解决惊群问题

Nginx中使用mutex互斥锁解决这个问题,具体措施有使用全局互斥锁,每个子进程在epoll_wait()之前先去申请锁,申请到则继续处理,获取不到则等待,并设置了一个负载均衡的 算法 (当某一个子进程的任务量达到总设置量的7/8时,则不会再尝试去申请锁)来均衡各个进程的任务量。后面深入学习一下Nginx的惊群处理过程。

作者:Anker

来源:http://www.cnblogs.com/Anker/p/7071849.html


——马哥教育Linux网络班27期开班倒计时1天——

——添加课程顾问,立刻领取专属周边——

——马哥教育,让你懂更让你行——