- A+
一.前言
从上个世纪到现在,工程师们在优化服务器性能的过程中,提出了各种不同的io模型,比如非阻塞io,io复用,信号驱动式io,异步io。具体io模型在不同平台上的实现也不一样,比如io复用在bsd上可以由kqueue实现,在solaris系统上可以由/dev/poll实现。为了实现系统的可移植性,POSIX 确保 select和poll在 unix-like系统上得到广泛的支持。
在上个世纪,Dan Kegel 提出了C10K的设想,现在C10K 已经不是什么问题,比如nginx就可以做到百万级别的qps。于是又有人提出来了C10M的设想,Robert David Graham 从unix的最初设计初衷给出了自己的解决方案。
二.常见io模型
1.阻塞io
常见的read系统调用,是最常见的阻塞io:
2.非阻塞式io
非阻塞io的典型使用方式如下,设置非阻塞标志,并且常与io复用一起使用,使用起来比较复杂。
val = Fcntl(sockfd, F_GETFL, 0); Fcntl(sockfd, F_SETFL, val | O_NONBLOCK); /* O_NONBLOCK 标志非阻塞 */
3.io 复用 (select/poll)
io复用在处理数量庞大的fd时非常有效,我们以select为例,select的核心api是select函数:
int select(int nfds, fd_set *_Nullable restrict readfds, fd_set *_Nullable restrict writefds, fd_set *_Nullable restrict exceptfds, struct timeval *_Nullable restrict timeout);
看一个例子:
#include "unp.h" void str_cli(FILE *fp, int sockfd) { int maxfdp1; fd_set rset; char sendline[MAXLINE], recvline[MAXLINE]; FD_ZERO(&rset); for ( ; ; ) { FD_SET(fileno(fp), &rset); /* 设置要监听的socket fd */ FD_SET(sockfd, &rset); /* 设置要监听的file fd */ maxfdp1 = max(fileno(fp), sockfd) + 1; Select(maxfdp1, &rset, NULL, NULL, NULL); /* select 调用 */ if (FD_ISSET(sockfd, &rset)) { /* socket 可读 */ if (Readline(sockfd, recvline, MAXLINE) == 0) err_quit("str_cli: server terminated prematurely"); Fputs(recvline, stdout); } if (FD_ISSET(fileno(fp), &rset)) { /* input 可读 */ if (Fgets(sendline, MAXLINE, fp) == NULL) return; /* all done */ Writen(sockfd, sendline, strlen(sendline)); } } }
4.信号驱动式io
但凡涉及到信号的程序都比较复杂。要使用信号驱动式io,先开启socket的信号驱动式io功能,并通过sigaction 系统调用安装一个信号处理函数:
void dg_echo(int sockfd_arg, SA *pcliaddr, socklen_t clilen_arg) { int i; const int on = 1; sigset_t zeromask, newmask, oldmask; sockfd = sockfd_arg; clilen = clilen_arg; for (i = 0; i < QSIZE; i++) { /* init queue of buffers */ dg[i].dg_data = Malloc(MAXDG); dg[i].dg_sa = Malloc(clilen); dg[i].dg_salen = clilen; } iget = iput = nqueue = 0; Signal(SIGHUP, sig_hup); /* 安装信号处理函数 */ Signal(SIGIO, sig_io); Fcntl(sockfd, F_SETOWN, getpid()); /* 设置属主 */ Ioctl(sockfd, FIOASYNC, &on); /* 开启信号驱动式io */ Ioctl(sockfd, FIONBIO, &on); /* non-bloking */ Sigemptyset(&zeromask); /* init three signal sets */ Sigemptyset(&oldmask); Sigemptyset(&newmask); Sigaddset(&newmask, SIGIO); /* signal we want to block */ Sigprocmask(SIG_BLOCK, &newmask, &oldmask); for ( ; ; ) { while (nqueue == 0) sigsuspend(&zeromask); /* wait for datagram to process */ /* 4unblock SIGIO */ Sigprocmask(SIG_SETMASK, &oldmask, NULL); Sendto(sockfd, dg[iget].dg_data, dg[iget].dg_len, 0, dg[iget].dg_sa, dg[iget].dg_salen); if (++iget >= QSIZE) iget = 0; /* 4block SIGIO */ Sigprocmask(SIG_BLOCK, &newmask, &oldmask); nqueue--; } }
5.异步io
我们来看一个aio的例子(由于aio的例子过于复杂,我们这里只截取部分关键代码):
for (i = 0; i < NBUF; i++) { switch (bufs[i].op) { case UNUSED: /* * Read from the input file if more data * remains unread. */ if (off < sbuf.st_size) { bufs[i].op = READ_PENDING; bufs[i].aiocb.aio_fildes = ifd; bufs[i].aiocb.aio_offset = off; off += BSZ; if (off >= sbuf.st_size) bufs[i].last = 1; bufs[i].aiocb.aio_nbytes = BSZ; if (aio_read(&bufs[i].aiocb) < 0) /* aio_read */ err_sys("aio_read failed"); aiolist[i] = &bufs[i].aiocb; numop++; } break; case READ_PENDING: if ((err = aio_error(&bufs[i].aiocb)) == EINPROGRESS) /* aio_error */ continue; if (err != 0) { if (err == -1) err_sys("aio_error failed"); else err_exit(err, "read failed"); } /* * A read is complete; translate the buffer * and write it. */ if ((n = aio_return(&bufs[i].aiocb)) < 0) /* 调用aio_return成功则 说明数据已经返回 */ err_sys("aio_return failed"); if (n != BSZ && !bufs[i].last) err_quit("short read (%d/%d)", n, BSZ); for (j = 0; j < n; j++) bufs[i].data[j] = translate(bufs[i].data[j]); bufs[i].op = WRITE_PENDING; bufs[i].aiocb.aio_fildes = ofd; bufs[i].aiocb.aio_nbytes = n; if (aio_write(&bufs[i].aiocb) < 0) /* aio_write */ err_sys("aio_write failed"); /* retain our spot in aiolist */ break; case WRITE_PENDING: if ((err = aio_error(&bufs[i].aiocb)) == EINPROGRESS) /* aio_error */ continue; if (err != 0) { if (err == -1) err_sys("aio_error failed"); else err_exit(err, "write failed"); } /* * A write is complete; mark the buffer as unused. */ if ((n = aio_return(&bufs[i].aiocb)) < 0) err_sys("aio_return failed"); if (n != bufs[i].aiocb.aio_nbytes) err_quit("short write (%d/%d)", n, BSZ); aiolist[i] = NULL; bufs[i].op = UNUSED; numop--; break; } }
6.同步和异步的分类
网络上对io同步和异步的争论很多,这里给出Stevens的分类标准:
同步 | 阻塞io,非阻塞io,io复用,信号驱动式io |
异步 | 异步io |
三.C10K io策略
在上个世纪,Dan Kegel 提出了C10K的设想,即单机实现10k的并发量,主要提出了以下四种类型的解决方法:
服务器范式 | 例子 | 备注 | 软件实现 |
Serve many clients with each thread, and use nonblocking I/O(level-triggered) | select, poll(posix), /dev/poll(solaris), kqueue(bsd) | 轮询 | |
Serve many clients with each thread, and use nonblocking I/O (readiness change) | kqueue(bsd), epoll(linux), Realtime Signals(linux) | 事件通知 | nginx, redis |
Serve many clients with each server thread, and use asynchronous I/O | aio | 异步,没有得到广泛支持 | |
Serve one client with each server thread |
LinuxThreads, Java threading support in JDK 1.3.x and earlier |
早期的java使用绿色线程 |
- 在实现的过程中有诸多限制,比如打开fd的限制,创建thread数量的限制,根据不同内核而异。
- 32 位系统,用户态的虚拟空间只有3G,如果创建线程时分配的栈空间是10M,那么一个进程最多只能创建300 个左右的线程。 64 位系统,用户态的虚拟空间大到有128T,理论上不会受虚拟内存大小的限制(10M个线程),而会受系统的参数或性能限制(线程上下文切换)。
四.C10M
Robert David Graham认为如果要解决C10M的问题,必须对unix内核进行改造。当下的unix系统的设计目标是为了满足非常广泛的需求,于是加上了许多通用的功能,比如进程管理,内存管理等等。C10M的问题不是通用的问题,需要自己处理数据控制,而不是依赖unix内核,而且需要做到packet scalability, multi-core scalability, memory scalability。
专项问题,需要特殊的解决方案。
五.总结
本文从常见io模型出发,梳理了高并发服务器可能涉及到的io模型,这些经典io模型在过去十年基本没有发生变化。了解这些底层技术对我们了解深入理解服务器是非常有必要的。
六.参考
http://www.kegel.com/c10k.html#threads.java
http://highscalability.com/blog/2013/5/13/the-secret-to-10-million-concurrent-connections-the-kernel-i.html
https://man7.org/linux/man-pages/man2/select.2.html