多路io复用
目录
前夕
在网络的初期,网民很少,服务器完全无压力,那时的技术也没有现在先进,通常用一个线程来全程跟踪处理一个请求。因为这样最简单。
其实代码实现大家都知道,就是服务器上有个ServerSocket在某个端口监听,接收到客户端的连接后,会创建一个Socket,并把它交给一个线程进行后续处理。线程主要从Socket读取客户端传过来的数据,然后进行业务处理,并把结果再写入Socket传回客户端。由于网络的原因,Socket创建后并不一定能立刻从它上面读取数据,可能需要等一段时间,此时线程也必须一直阻塞着。在向Socket写入数据时,也可能会使线程阻塞。
附上Socket学习地址
https://blog.csdn.net/weixin_41563161/article/details/104779605
高性能IO模型浅析
服务器端编程经常需要构造高性能的IO模型,常见的IO模型有四种:
(1)同步阻塞IO(Blocking IO):即传统的IO模型。
(2)同步非阻塞IO(Non-blocking IO):默认创建的socket都是阻塞的,非阻塞IO要求socket被设置为NONBLOCK。注意这里所说的NIO并非Java的NIO(New IO)库。
(3)IO多路复用(IO Multiplexing):即经典的Reactor设计模式,有时也称为异步阻塞IO,Java中的Selector和Linux中的epoll都是这种模型。
(4)异步IO(Asynchronous IO):即经典的Proactor设计模式,也称为异步非阻塞IO。
同步和异步的概念描述的是用户线程与内核的交互方式:同步是指用户线程发起IO请求后需要等待或者轮询内核IO操作完成后才能继续执行;而异步是指用户线程发起IO请求后仍继续执行,当内核IO操作完成后会通知用户线程,或者调用用户线程注册的回调函数。
阻塞和非阻塞的概念描述的是用户线程调用内核IO操作的方式:阻塞是指IO操作需要彻底完成后才返回到用户空间;而非阻塞是指IO操作被调用后立即返回给用户一个状态值,无需等到IO操作彻底完成。
一、同步阻塞IO
如图1所示,用户线程通过系统调用read发起IO读操作,由用户空间转到内核空间。内核等到数据包到达后,然后将接收的数据拷贝到用户空间,完成read操作。
{
read(socket, buffer);
process(buffer);
}
即用户需要等待read将socket中的数据读取到buffer后,才继续处理接收的数据。整个IO请求的过程中,
用户线程是被阻塞的,这导致用户在发起IO请求时,不能做任何事情,对CPU的资源利用率不够。
二、同步非阻塞IO
同步非阻塞IO是在同步阻塞IO的基础上,将socket设置为NONBLOCK。这样做用户线程可以在发起IO请求后可以立即返回。
如图2所示,由于socket是非阻塞的方式,因此用户线程发起IO请求时立即返回。但并未读取到任何数据,用户线程需要不断地发起IO请求,直到数据到达后,才真正读取到数据,继续执行。
用户线程使用同步非阻塞IO模型的伪代码描述为:
{
while(read(socket, buffer) != SUCCESS)
;
process(buffer);
}
即用户需要不断地调用read,尝试读取socket中的数据,直到读取成功后,才继续处理接收的数据。整个IO请求的过程中,虽然用户线程每次发起IO请求后可以立即返回,但是为了等到数据,仍需要不断地轮询、重复请求,消耗了大量的CPU的资源。一般很少直接使用这种模型,而是在其他IO模型中使用非阻塞IO这一特性。
三、IO多路复用
IO多路复用模型是建立在内核提供的多路分离函数select基础之上的,使用select函数可以避免同步非阻塞IO模型中轮询等待的问题。
如图3所示,用户首先将需要进行IO操作的socket添加到select中,然后阻塞等待select系统调用返回。当数据到达时,socket被激活,select函数返回。用户线程正式发起read请求,读取数据并继续执行。
从流程上来看,使用select函数进行IO请求和同步阻塞模型没有太大的区别,甚至还多了添加监视socket,以及调用select函数的额外操作,效率更差。但是,使用select以后最大的优势是用户可以在一个线程内同时处理多个socket的IO请求。用户可以注册多个socket,然后不断地调用select读取被激活的socket,即可达到在同一个线程内同时处理多个IO请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。
{
select(socket);
while(1) {
sockets = select();
for(socket in sockets) {
if(can_read(socket)) {
read(socket, buffer);
process(buffer);
}
}
}
}
其中while循环前将socket添加到select监视中,然后在while内一直调用select获取被激活的socket,一旦socket可读,便调用read函数将socket中的数据读取出来。
通过Reactor的方式,可以将用户线程轮询IO操作状态的工作统一交给handle_events事件循环进行处理。用户线程注册事件处理器之后可以继续执行做其他的工作(异步),而Reactor线程负责调用内核的select函数检查socket状态。当有socket被激活时,则通知相应的用户线程(或执行用户线程的回调函数),执行handle_event进行数据读取、处理的工作。由于select函数是阻塞的,因此多路IO复用模型也被称为异步阻塞IO模型。注意,这里的所说的阻塞是指select函数执行时线程被阻塞,而不是指socket。一般在使用IO多路复用模型时,socket都是设置为NONBLOCK的,不过这并不会产生影响,因为用户发起IO请求时,数据已经到达了,用户线程一定不会被阻塞。
几个函数
来源:
深入了解参考资料:
- www.cnblogs.com/Anker/p/326…--->select、poll、epoll之间的区别总结[整理]
select
一、select实现
1、使用copy_from_user从用户空间拷贝fd_set到内核空间
2、注册回调函数__pollwait
3、遍历所有fd,调用其对应的poll方法(对于socket,这个poll方法是sock_poll,sock_poll根据情况会调用到tcp_poll,udp_poll或者datagram_poll)
4、以tcp_poll为例,其核心实现就是__pollwait,也就是上面注册的回调函数。
5、__pollwait的主要工作就是把current(当前进程)挂到设备的等待队列中,不同的设备有不同的等待队列,对于tcp_poll来说,其等待队列是sk->sk_sleep(注意把进程挂到等待队列中并不代表进程已经睡眠了)。在设备收到一条消息(网络设备)或填写完文件数据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时current便被唤醒了。
6、poll方法返回时会返回一个描述读写操作是否就绪的mask掩码,根据这个mask掩码给fd_set赋值。
7、如果遍历完所有的fd,还没有返回一个可读写的mask掩码,则会调用schedule_timeout是调用select的进程(也就是current)进入睡眠。当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的进程。如果超过一定的超时时间(schedule_timeout指定),还是没人唤醒,则调用select的进程会重新被唤醒获得CPU,进而重新遍历fd,判断有没有就绪的fd。
8、把fd_set从内核空间拷贝到用户空间。
https://blog.csdn.net/coolgw2015/article/details/79719328
文件描述符fd
文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。
文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。
I/O的socket操作也是一种文件描述符fd。
https://www.liangzl.com/get-article-detail-121935.html
将文件描述符(fd)放入一个集合中(fd_set
,实际上是一个long类型的数组),当调用select时,将fd_set
从用户空间拷贝到内核空间,由内核根据IO状态修改fd_set
的内容,由此来判断哪个fd对应的socket可读。它的好处时可以在同一线程内同时处理多个IO请求。
select具有以下缺点:
- 需要维护一个存放大量fd的数据结构,每次将
fd_set
从用户空间复制到内核空间,开销大。 - 维持的fd数量有限,一般32位机器默认是1024, 64位机器默认是2048(在linux内核头文件中, 有定义:
#define FD_SETSIZE 1024
),这是操作系统防止该操作造成系统性能受到太大影响。 - 对fd采用轮询扫描, 效率低
- 采用水平触发, 如果报告了fd后, 没有被处理, 那么下次处理时还会通知进程。
poll
本质上与select
没有差别,仅在fd存储方式有所不同,采用链表的方式存储(pollfd
),达到可以维持任意数量的fd。但是select
存在的其他缺点,它同样存在。
epoll
epoll的设计和实现与select完全不同。
综合的来说:
epoll在内核中申请一个简易的文件系统,把原先的select/poll调用分成了3个部分。连接的套接字(socket句柄)是采用红黑树的结构存储在内核cache中的,并给内核中断处理程序注册一个回调函数,告诉内核:如果这个句柄的中断到了,就把它放到准备就绪list链表里。
当有事件准备就绪时,内核在把网卡上的数据copy到内核中后就来把socket插入到准备就绪链表里了(epoll的基础是回调)。当epoll_wait调用时,仅仅观察这个list链表里有没有数据即可;有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。
1)调用epoll_create建立一个epoll对象(在epoll文件系统中为这个句柄对象分配资源, 创建了红黑树和就绪链表)
2)调用epoll_ctl向epoll对象中添加这100万个连接的套接字 (如果增加socket句柄,则检查在红黑树中是否存在,存在立即返回,不存在则添加到树干上,然后向内核注册回调函数,用于当中断事件来临时向准备就绪链表中插入数据)
3)调用epoll_wait收集发生的事件的连接 (立刻返回准备就绪链表里的数据)
两种模式LT和ET
ET是边缘触发,LT是水平触发,一个表示只有在变化的边际触发,一个表示在某个阶段都会触发。
LT模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件
。下次调用epoll_wait时,会再次响应应用程序并通知此事件。
ET模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件
。如果不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。
当一个socket句柄上有事件时,内核会把该句柄插入上面所说的准备就绪list链表,这时我们调用epoll_wait,会把准备就绪的socket拷贝到用户态内存,然后清空准备就绪list链表,最后,epoll_wait干了件事,就是检查这些socket,如果不是ET模式(就是LT模式的句柄了),并且这些socket上确实有未处理的事件时,又把该句柄放回到刚刚清空的准备就绪链表了。所以,非ET的句柄,只要它上面还有事件,epoll_wait每次都会返回这个句柄。(从上面这段,可以看出,LT还有个回放的过程,低效了)
参考链接 https://blog.csdn.net/weixin_41563161/article/details/104087917
缓存 I/O
缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。
缓存 I/O 的缺点:数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。
几个图像形象的表示
1、非阻塞、忙轮询
2. 方法2:select与poll
select 代收员 比较懒,她只会告诉你快递到了,但是是谁到的,你需要挨个快递员问一遍。每次调用select前都要重新初始化描述符集,将fd从用户态拷贝到内核态,每次调用select后,都需要将fd从内核态拷贝到用户态;poll基本上也相同,只不过poll没有最大文件描述符限制,poll解决了select重复初始化的问题。但仍旧是轮寻排查
方法3:epoll
什么是epoll
- 是IO多路转接技术
- 只需关心活跃的连接,无需遍历全部的描述符集合
- 能够处理大量的连接请求(系统可以打开的文件数目)
三种方式的区别
select,poll,epoll都是IO多路复用的机制。一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。
https://segmentfault.com/a/1190000003063859;
https://blog.csdn.net/wxy941011/article/details/80274233
- select的本质是采用32个整数的32位,即32*32= 1024来标识,fd值为1-1024。当fd的值超过1024限制时,就必须修改FD_SETSIZE的大小。这个时候就可以标识32*max值范围的fd。
- poll与select不同,通过一个pollfd数组向内核传递需要关注的事件,故没有描述符个数的限制,pollfd中的events字段和revents分别用于标示关注的事件和发生的事件,故pollfd数组只需要被初始化一次。
- epoll还是poll的一种优化,返回后不需要对所有的fd进行遍历,在内核中维持了fd的列表。select和poll是将这个内核列表维持在用户态,然后传递到内核中。与poll/select不同,epoll不再是一个单独的系统调用,而是由epoll_create/epoll_ctl/epoll_wait三个系统调用组成,epoll在2.6以后的内核才支持。
select/poll的几大缺点
- 针对select支持的文件描述符数量太小了,默认是1024, 相比select模型,poll使用链表保存文件描述符,因此没有了监视文件数量的限制,但下面的缺点依然存在。
- 每次调用select/poll,都需要把fd集合从用户态拷贝到内核态, 并在内核中遍历传递进来的所有fd; 返回的的是含有整个句柄的数组,应用程序需要遍历整个数组才能发现哪些句柄发生了事件
-
select的触发方式是水平触发,应用程序如果没有完成对一个已经就绪的文件描述符进行IO操作,那么之后每次select调用还是会将这些文件描述符通知进程。
异步IO
“真正”的异步IO需要操作系统更强的支持。在IO多路复用模型中,事件循环将文件句柄的状态事件通知给用户线程,由用户线程自行读取数据、处理数据。而在异步IO模型中,当用户线程收到通知时,数据已经被内核读取完毕,并放在了用户线程指定的缓冲区内,内核在IO完成后通知用户线程直接使用即可。
异步IO模型使用了Proactor设计模式实现了这一机制。
异步IO模型中,用户线程直接使用内核提供的异步IO API发起read请求,且发起后立即返回,继续执行用户线程代码。不过此时用户线程已经将调用的AsynchronousOperation和CompletionHandler注册到内核,然后操作系统开启独立的内核线程去处理IO操作。当read请求的数据到达时,由内核负责读取socket中的数据,并写入用户指定的缓冲区中。最后内核将read的数据和用户线程注册的CompletionHandler分发给内部Proactor,Proactor将IO完成的信息通知给用户线程(一般通过调用用户线程注册的完成事件处理函数),完成异步IO。
void UserCompletionHandler::handle_event(buffer) {
process(buffer);
}
{
aio_read(socket, new UserCompletionHandler);
}
相比于IO多路复用模型,异步IO并不十分常用,不少高性能并发服务程序使用IO多路复用模型+多线程任务处理的架构基本可以满足需求。况且目前操作系统对异步IO的支持并非特别完善,更多的是采用IO多路复用模型模拟异步IO的方式(IO事件触发时不直接通知用户线程,而是将数据读写完毕后放到用户指定的缓冲区中)。
普通IO 程序
这里准备了一个示例,主要逻辑如下:
客户端:创建20个Socket并连接到服务器上,再创建20个线程,每个线程负责一个Socket。
服务器端:接收到这20个连接,创建20个Socket,接着创建20个线程,每个线程负责一个Socket。
为了模拟服务器端的Socket在创建后不能立马读取数据,让客户端的20个线程分别休眠5-10之间的一个随机秒数。
客户端的20个线程会在第5秒到第10秒这段时间内陆陆续续的向服务器端发送数据,服务器端的20个线程也会陆陆续续接收到数据。
public class BioServer {
static AtomicInteger counter = new AtomicInteger(0);
static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
public static void main(String[] args) {
try {
ServerSocket ss = new ServerSocket();
ss.bind(new InetSocketAddress("localhost", 8080));
while (true) {
Socket s = ss.accept();
processWithNewThread(s);
}
} catch (Exception e) {
e.printStackTrace();
}
}
static void processWithNewThread(Socket s) {
Runnable run = () -> {
InetSocketAddress rsa = (InetSocketAddress)s.getRemoteSocketAddress();
System.out.println(time() + "->" + rsa.getHostName() + ":" + rsa.getPort() + "->" + Thread.currentThread().getId() + ":" + counter.incrementAndGet());
try {
String result = readBytes(s.getInputStream());
System.out.println(time() + "->" + result + "->" + Thread.currentThread().getId() + ":" + counter.getAndDecrement());
s.close();
} catch (Exception e) {
e.printStackTrace();
}
};
new Thread(run).start();
}
static String readBytes(InputStream is) throws Exception {
long start = 0;
int total = 0;
int count = 0;
byte[] bytes = new byte[1024];
//开始读数据的时间
long begin = System.currentTimeMillis();
while ((count = is.read(bytes)) > -1) {
if (start < 1) {
//第一次读到数据的时间
start = System.currentTimeMillis();
}
total += count;
}
//读完数据的时间
long end = System.currentTimeMillis();
return "wait=" + (start - begin) + "ms,read=" + (end - start) + "ms,total=" + total + "bs";
}
static String time() {
return sdf.format(new Date());
}
}
public class Client {
public static void main(String[] args) {
try {
for (int i = 0; i < 20; i++) {
Socket s = new Socket();
s.connect(new InetSocketAddress("localhost", 8080));
processWithNewThread(s, i);
}
} catch (IOException e) {
e.printStackTrace();
}
}
static void processWithNewThread(Socket s, int i) {
Runnable run = () -> {
try {
//睡眠随机的5-10秒,模拟数据尚未就绪
Thread.sleep((new Random().nextInt(6) + 5) * 1000);
//写1M数据,为了拉长服务器端读数据的过程
s.getOutputStream().write(prepareBytes());
//睡眠1秒,让服务器端把数据读完
Thread.sleep(1000);
s.close();
} catch (Exception e) {
e.printStackTrace();
}
};
new Thread(run).start();
}
static byte[] prepareBytes() {
byte[] bytes = new byte[1024*1024*1];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = 1;
}
return bytes;
}
}
执行结果如下:
时间->IP:Port->线程Id:当前线程数
15:11:52->127.0.0.1:55201->10:1
15:11:52->127.0.0.1:55203->12:2
15:11:52->127.0.0.1:55204->13:3
15:11:52->127.0.0.1:55207->16:4
15:11:52->127.0.0.1:55208->17:5
15:11:52->127.0.0.1:55202->11:6
15:11:52->127.0.0.1:55205->14:7
15:11:52->127.0.0.1:55206->15:8
15:11:52->127.0.0.1:55209->18:9
15:11:52->127.0.0.1:55210->19:10
15:11:52->127.0.0.1:55213->22:11
15:11:52->127.0.0.1:55214->23:12
15:11:52->127.0.0.1:55217->26:13
15:11:52->127.0.0.1:55211->20:14
15:11:52->127.0.0.1:55218->27:15
15:11:52->127.0.0.1:55212->21:16
15:11:52->127.0.0.1:55215->24:17
15:11:52->127.0.0.1:55216->25:18
15:11:52->127.0.0.1:55219->28:19
15:11:52->127.0.0.1:55220->29:20
时间->等待数据的时间,读取数据的时间,总共读取的字节数->线程Id:当前线程数
15:11:58->wait=5012ms,read=1022ms,total=1048576bs->17:20
15:11:58->wait=5021ms,read=1022ms,total=1048576bs->13:19
15:11:58->wait=5034ms,read=1008ms,total=1048576bs->11:18
15:11:58->wait=5046ms,read=1003ms,total=1048576bs->12:17
15:11:58->wait=5038ms,read=1005ms,total=1048576bs->23:16
15:11:58->wait=5037ms,read=1010ms,total=1048576bs->22:15
15:11:59->wait=6001ms,read=1017ms,total=1048576bs->15:14
15:11:59->wait=6016ms,read=1013ms,total=1048576bs->27:13
15:11:59->wait=6011ms,read=1018ms,total=1048576bs->24:12
15:12:00->wait=7005ms,read=1008ms,total=1048576bs->20:11
15:12:00->wait=6999ms,read=1020ms,total=1048576bs->14:10
15:12:00->wait=7019ms,read=1007ms,total=1048576bs->26:9
15:12:00->wait=7012ms,read=1015ms,total=1048576bs->21:8
15:12:00->wait=7023ms,read=1008ms,total=1048576bs->25:7
15:12:01->wait=7999ms,read=1011ms,total=1048576bs->18:6
15:12:02->wait=9026ms,read=1014ms,total=1048576bs->10:5
15:12:02->wait=9005ms,read=1031ms,total=1048576bs->19:4
15:12:03->wait=10007ms,read=1011ms,total=1048576bs->16:3
15:12:03->wait=10006ms,read=1017ms,total=1048576bs->29:2
15:12:03->wait=10010ms,read=1022ms,total=1048576bs->28:1
可以看到服务器端确实为每个连接创建一个线程,共创建了20个线程。
客户端进入休眠约5-10秒,模拟连接上数据不就绪,服务器端线程在等待,等待时间约5-10秒。
客户端陆续结束休眠,往连接上写入1M数据,服务器端开始读取数据,整个读取过程约1秒。
可以看到,服务器端的工作线程会把时间花在“等待数据”和“读取数据”这两个过程上。
这有两个不好的地方:
一是有很多客户端同时发起请求的话,服务器端要创建很多的线程,可能会因为超过了上限而造成崩溃。
二是每个线程的大部分时光中都是在阻塞着,无事可干,造成极大的资源浪费。
多路复用技术 程序
在通信方面,多种信号或数据(从宏观上看)交织在一起,使用同一条传输通道进行传输。
这样做的目的,一方面可以充分利用通道的传输能力,另一方面自然是省时省力省钱啦。
多路指的是多种不同的信号或数据或其它事物,复用指的是共用同一个物理链路或通道或载体。
多路复用技术是一种一对多的模型,“多”的这一方复用了“一”的这一方。
其实这个模式在Java里早就有了,就是Java NIO,这里的大写字母“N”是单词“New”,即“新”的意思,主要是为了和上面的“一对一”进行区分。
现在需要把Socket交互的过程再稍微细化一些。客户端先请求连接,connect,服务器端然后接受连接,accept,然后客户端再向连接写入数据,write,接着服务器端从连接上读出数据,read。
和打电话的场景一样,主叫拨号,connect,被叫接听,accept,主叫说话,speak,被叫聆听,listen。主叫给被叫打电话,说明主叫找被叫有事,所以被叫关注的是接通电话,听对方说。
客户端主动向服务器端发起请求,说明客户端找服务器端有事,所以服务器端关注的是接受请求,读取对方传来的数据。这里把接受请求,读取数据称为服务器端感兴趣的操作。
在Java NIO中,接受请求的操作,用OP_ACCEPT表示,读取数据的操作,用OP_READ表示。
Java NIO
1、定义一个选择器,Selector。
2、定义一个服务器端套接字通道,ServerSocketChannel,并配置为非阻塞的。
3、将套接字通道注册到选择器上,并把感兴趣的操作设置为OP_ACCEPT。
4、进入死循环,选择器不时的进行选择。
5、选择器终于选择出了通道,发现通道是需要Acceptable的。
6、于是服务器端套接字接受了这个通道,开始处理。
7、把新接受的通道配置为非阻塞的,并把它也注册到了选择器上,该通道感兴趣的操作为OP_READ。
8、选择器继续不时的进行选择着。
9、选择器终于又选择出了通道,这次发现通道是需要Readable的。
10、把这个通道交给了一个新的工作线程去处理。
11、这个工作线程处理完后,就被回收了,可以再去处理其它通道。
12、选择器继续着重复的选择工作,不知道什么时候是个头。
public class NioServer {
static int clientCount = 0;
static AtomicInteger counter = new AtomicInteger(0);
static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
public static void main(String[] args) {
try {
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress("localhost", 8080));
while (true) {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
ServerSocketChannel ssc1 = (ServerSocketChannel)key.channel();
SocketChannel sc = null;
while ((sc = ssc1.accept()) != null) {
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
InetSocketAddress rsa = (InetSocketAddress)sc.socket().getRemoteSocketAddress();
System.out.println(time() + "->" + rsa.getHostName() + ":" + rsa.getPort() + "->" + Thread.currentThread().getId() + ":" + (++clientCount));
}
} else if (key.isReadable()) {
//先将“读”从感兴趣操作移出,待把数据从通道中读完后,再把“读”添加到感兴趣操作中
//否则,该通道会一直被选出来
key.interestOps(key.interestOps() & (~ SelectionKey.OP_READ));
processWithNewThread((SocketChannel)key.channel(), key);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
static void processWithNewThread(SocketChannel sc, SelectionKey key) {
Runnable run = () -> {
counter.incrementAndGet();
try {
String result = readBytes(sc);
//把“读”加进去
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
System.out.println(time() + "->" + result + "->" + Thread.currentThread().getId() + ":" + counter.get());
sc.close();
} catch (Exception e) {
e.printStackTrace();
}
counter.decrementAndGet();
};
new Thread(run).start();
}
static String readBytes(SocketChannel sc) throws Exception {
long start = 0;
int total = 0;
int count = 0;
ByteBuffer bb = ByteBuffer.allocate(1024);
//开始读数据的时间
long begin = System.currentTimeMillis();
while ((count = sc.read(bb)) > -1) {
if (start < 1) {
//第一次读到数据的时间
start = System.currentTimeMillis();
}
total += count;
bb.clear();
}
//读完数据的时间
long end = System.currentTimeMillis();
return "wait=" + (start - begin) + "ms,read=" + (end - start) + "ms,total=" + total + "bs";
}
static String time() {
return sdf.format(new Date());
}
}
时间->IP:Port->主线程Id:当前连接数
16:34:39->127.0.0.1:56105->1:1
16:34:39->127.0.0.1:56106->1:2
16:34:39->127.0.0.1:56107->1:3
16:34:39->127.0.0.1:56108->1:4
16:34:39->127.0.0.1:56109->1:5
16:34:39->127.0.0.1:56110->1:6
16:34:39->127.0.0.1:56111->1:7
16:34:39->127.0.0.1:56112->1:8
16:34:39->127.0.0.1:56113->1:9
16:34:39->127.0.0.1:56114->1:10
16:34:39->127.0.0.1:56115->1:11
16:34:39->127.0.0.1:56116->1:12
16:34:39->127.0.0.1:56117->1:13
16:34:39->127.0.0.1:56118->1:14
16:34:39->127.0.0.1:56119->1:15
16:34:39->127.0.0.1:56120->1:16
16:34:39->127.0.0.1:56121->1:17
16:34:39->127.0.0.1:56122->1:18
16:34:39->127.0.0.1:56123->1:19
16:34:39->127.0.0.1:56124->1:20
时间->等待数据的时间,读取数据的时间,总共读取的字节数->线程Id:当前线程数
16:34:45->wait=1ms,read=1018ms,total=1048576bs->11:5
16:34:45->wait=0ms,read=1054ms,total=1048576bs->10:5
16:34:45->wait=0ms,read=1072ms,total=1048576bs->13:6
16:34:45->wait=0ms,read=1061ms,total=1048576bs->14:5
16:34:45->wait=0ms,read=1140ms,total=1048576bs->12:4
16:34:46->wait=0ms,read=1001ms,total=1048576bs->15:5
16:34:46->wait=0ms,read=1062ms,total=1048576bs->17:6
16:34:46->wait=0ms,read=1059ms,total=1048576bs->16:5
16:34:47->wait=0ms,read=1001ms,total=1048576bs->19:4
16:34:47->wait=0ms,read=1001ms,total=1048576bs->20:4
16:34:47->wait=0ms,read=1015ms,total=1048576bs->18:3
16:34:47->wait=0ms,read=1001ms,total=1048576bs->21:2
16:34:48->wait=0ms,read=1032ms,total=1048576bs->22:4
16:34:49->wait=0ms,read=1002ms,total=1048576bs->23:3
16:34:49->wait=0ms,read=1001ms,total=1048576bs->25:2
16:34:49->wait=0ms,read=1028ms,total=1048576bs->24:4
16:34:50->wait=0ms,read=1008ms,total=1048576bs->28:4
16:34:50->wait=0ms,read=1033ms,total=1048576bs->27:3
16:34:50->wait=1ms,read=1002ms,total=1048576bs->29:2
16:34:50->wait=0ms,read=1001ms,total=1048576bs->26:2
服务器端接受20个连接,创建20个通道,并把它们注册到选择器上,此时不需要额外线程。
当某个通道已经有数据时,才会用一个线程来处理它,所以,线程“等待数据”的时间是0,“读取数据”的时间还是约1秒。
因为20个通道是陆陆续续有数据的,所以服务器端最多时是6个线程在同时运行的,换句话说,用包含6个线程的线程池就可以了。
处理同样的20个请求,一个需要用20个线程,一个需要用6个线程,节省了70%线程数。
这种处理模式就是被称为的多路复用I/O,多路指的是多个Socket通道,复用指的是只用一个线程来管理它们。
Java AIO 程序
1、初始化一个AsynchronousServerSocketChannel对象,并开始监听
2、通过accept方法注册一个“完成处理器”的接受连接回调,即CompletionHandler,用于在接受到连接后的相关操作。
3、当客户端连接过来后,由系统来接受,并创建好AsynchronousSocketChannel对象,然后触发该回调,并把该对象传进该回调,该回调会在Worker线程中执行。
4、在接受连接回调里,再次使用accept方法注册一次相同的完成处理器对象,用于让系统接受下一个连接。就是这种注册只能使用一次,所以要不停的连续注册,人家就是这样设计的。
5、在接受连接回调里,使用AsynchronousSocketChannel对象的read方法注册另一个接受数据回调,用于在接受到数据后的相关操作。
6、当客户端数据过来后,由系统接受,并放入指定好的ByteBuffer中,然后触发该回调,并把本次接受到的数据字节数传入该回调,该回调会在Worker线程中执行。
7、在接受数据回调里,如果数据没有接受完,需要再次使用read方法把同一个对象注册一次,用于让系统接受下一次数据。这和上面的套路是一样的。
8、客户端的数据可能是分多次传到服务器端的,所以接受数据回调会被执行多次,直到数据接受完为止。多次接受到的数据合起来才是完整的数据,这个一定要处理好。
9、关于ByteBuffer,要么足够的大,能够装得下完整的客户端数据,这样多次接受的数据直接往里追加即可。要么每次把ByteBuffer中的数据移到别的地方存储起来,然后清空ByteBuffer,用于让系统往里装入下一次接受的数据。
注:如果出现ByteBuffer空间不足,则系统不会装入数据,就会导致客户端数据总是读不完,极有可能进入死循环。
启动服务器端代码,使用同一个客户端代码,按相同的套路发20个请求,结果如下:
public class AioServer {
static int clientCount = 0;
static AtomicInteger counter = new AtomicInteger(0);
static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
public static void main(String[] args) {
try {
AsynchronousServerSocketChannel assc = AsynchronousServerSocketChannel.open();
assc.bind(new InetSocketAddress("localhost", 8080));
//非阻塞方法,其实就是注册了个回调,而且只能接受一个连接
assc.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel asc, Object attachment) {
//再次注册,接受下一个连接
assc.accept(null, this);
try {
InetSocketAddress rsa = (InetSocketAddress)asc.getRemoteAddress();
System.out.println(time() + "->" + rsa.getHostName() + ":" + rsa.getPort() + "->" + Thread.currentThread().getId() + ":" + (++clientCount));
} catch (Exception e) {
}
readFromChannelAsync(asc);
}
@Override
public void failed(Throwable exc, Object attachment) {
}
});
//不让主线程退出
synchronized (AioServer.class) {
AioServer.class.wait();
}
} catch (Exception e) {
e.printStackTrace();
}
}
static void readFromChannelAsync(AsynchronousSocketChannel asc) {
//会把数据读入到该buffer之后,再触发工作线程来执行回调
ByteBuffer bb = ByteBuffer.allocate(1024*1024*1 + 1);
long begin = System.currentTimeMillis();
//非阻塞方法,其实就是注册了个回调,而且只能接受一次读取
asc.read(bb, null, new CompletionHandler<Integer, Object>() {
//从该连接上一共读到的字节数
int total = 0;
/**
* @param count 表示本次读取到的字节数,-1表示数据已读完
*/
@Override
public void completed(Integer count, Object attachment) {
counter.incrementAndGet();
if (count > -1) {
total += count;
}
int size = bb.position();
System.out.println(time() + "->count=" + count + ",total=" + total + "bs,buffer=" + size + "bs->" + Thread.currentThread().getId() + ":" + counter.get());
if (count > -1) {//数据还没有读完
//再次注册回调,接受下一次读取
asc.read(bb, null, this);
} else {//数据已读完
try {
asc.close();
} catch (Exception e) {
e.printStackTrace();
}
}
counter.decrementAndGet();
}
@Override
public void failed(Throwable exc, Object attachment) {
}
});
long end = System.currentTimeMillis();
System.out.println(time() + "->exe read req,use=" + (end -begin) + "ms" + "->" + Thread.currentThread().getId());
}
static String time() {
return sdf.format(new Date());
}
}
时间->IP:Port->回调线程Id:当前连接数
17:20:47->127.0.0.1:56454->15:1
时间->发起一个读请求,耗时->回调线程Id
17:20:47->exe read req,use=3ms->15
17:20:47->127.0.0.1:56455->15:2
17:20:47->exe read req,use=1ms->15
17:20:47->127.0.0.1:56456->15:3
17:20:47->exe read req,use=0ms->15
17:20:47->127.0.0.1:56457->16:4
17:20:47->127.0.0.1:56458->15:5
17:20:47->exe read req,use=1ms->16
17:20:47->exe read req,use=1ms->15
17:20:47->127.0.0.1:56460->15:6
17:20:47->127.0.0.1:56459->17:7
17:20:47->exe read req,use=0ms->15
17:20:47->127.0.0.1:56462->15:8
17:20:47->127.0.0.1:56461->16:9
17:20:47->exe read req,use=1ms->15
17:20:47->exe read req,use=0ms->16
17:20:47->exe read req,use=0ms->17
17:20:47->127.0.0.1:56465->16:10
17:20:47->127.0.0.1:56463->18:11
17:20:47->exe read req,use=0ms->18
17:20:47->127.0.0.1:56466->15:12
17:20:47->exe read req,use=1ms->16
17:20:47->127.0.0.1:56464->17:13
17:20:47->exe read req,use=1ms->15
17:20:47->127.0.0.1:56467->18:14
17:20:47->exe read req,use=2ms->17
17:20:47->exe read req,use=1ms->18
17:20:47->127.0.0.1:56468->15:15
17:20:47->exe read req,use=1ms->15
17:20:47->127.0.0.1:56469->16:16
17:20:47->127.0.0.1:56470->18:17
17:20:47->exe read req,use=1ms->18
17:20:47->exe read req,use=1ms->16
17:20:47->127.0.0.1:56472->15:18
17:20:47->127.0.0.1:56473->19:19
17:20:47->exe read req,use=2ms->15
17:20:47->127.0.0.1:56471->17:20
17:20:47->exe read req,use=1ms->19
17:20:47->exe read req,use=1ms->17
时间->本次接受到的字节数,截至到目前接受到的字节总数,buffer中的字节总数->回调线程Id:当前线程数
17:20:52->count=65536,total=65536bs,buffer=65536bs->14:1
17:20:52->count=65536,total=65536bs,buffer=65536bs->14:1
17:20:52->count=65536,total=65536bs,buffer=65536bs->14:1
17:20:52->count=230188,total=295724bs,buffer=295724bs->12:1
17:20:52->count=752852,total=1048576bs,buffer=1048576bs->14:3
17:20:52->count=131072,total=196608bs,buffer=196608bs->17:2
。。。。。。。。。。。。。。。。。。。。。。
17:20:57->count=-1,total=1048576bs,buffer=1048576bs->15:1
17:20:57->count=-1,total=1048576bs,buffer=1048576bs->15:1
17:20:57->count=-1,total=1048576bs,buffer=1048576bs->15:1
17:20:57->count=-1,total=1048576bs,buffer=1048576bs->15:1
17:20:58->count=-1,total=1048576bs,buffer=1048576bs->15:1
17:20:58->count=-1,total=1048576bs,buffer=1048576bs->15:1
17:20:58->count=-1,total=1048576bs,buffer=1048576bs->15:1
系统接受到连接后,在工作线程中执行了回调。并且在回调中执行了read方法,耗时是0,因为只是注册了个接受数据的回调而已。
系统接受到数据后,把数据放入ByteBuffer,在工作线程中执行了回调。并且回调中可以直接使用ByteBuffer中的数据。
接受数据的回调被执行了多次,多次接受到的数据加起来正好等于客户端传来的数据。
因为系统是接受到数据后才触发的回调,所以服务器端最多时是3个线程在同时运行回调的,换句话说,线程池包含3个线程就可以了。
处理同样的20个请求,一个需要用20个线程,一个需要用6个线程,一个需要3个线程,又节省了50%线程数。
注:不用特别较真这个比较结果,这里只是为了说明问题而已。哈哈。
三种处理方式的对比
第一种是阻塞IO,阻塞点有两个,等待数据就绪的过程和读取数据的过程。
第二种是阻塞IO,阻塞点有一个,读取数据的过程。
第三种是非阻塞IO,没有阻塞点,当工作线程启动时,数据已经(被系统)准备好可以直接用了。
可见,这是一个逐步消除阻塞点的过程。
参考文献
https://www.liangzl.com/get-article-detail-121935.html
https://blog.csdn.net/coolgw2015/article/details/79719328
https://www.cnblogs.com/lixinjie/archive/2019/06/16/11033062.html