正文
多路复用器java代码 多路复用器java代码怎么写
小程序:扫一扫查出行
【扫一扫了解最新限行尾号】
复制小程序
【扫一扫了解最新限行尾号】
复制小程序
Selector选择器
Selector 一般称 为 选择器 ,也可以翻译为 多路复用器 。它是Java NIO核心组件中的一个,用于检查一个或多个NIO Channel(通道)的状态是否处于可读、可写。如此可以实现单线程管理多个channels,也就是可以管理多个网络链接
学习Selector之前,需要先了解阻塞IO和非阻塞IO的区别。以烧水为例,出场人物:老王,普通水壶和响水壶。
通过以上故事,我们得到了同步和异步,阻塞和非阻塞几个概念:
IO 包中的类是典型的阻塞设计,这无法发挥的性能,就需要使用多线程来解决阻塞问题 。
在 NIO 中,Selector的作用就是用来轮询每个注册的Channel,一旦发现Channel有注册的事件发生,便获取事件然后进行处理 。
示例
Channel 必须是非阻塞的 所以 FileChannel 不适用 Selector,因为 FileChannel 不能切换为非阻塞模式,更准确的来说是因为 FileChannel没有继承SelectableChannel。Socket channel可以正常使用。
SelectionKey 可以称为 interest集合 。 意思是在 通过Selector监听Channel时对什么事件感兴趣 。通道触发了一个事件意思是该事件已经就绪。
如果需要不止一种事件,使用或运算符即可,如下:
一个SelectionKey键表示了一个特定的通道对象和一个特定的选择器对象之间的注册关系
java网络io模型有几种
#BIO---Blocking IO
- 每个socket一个线程,读写时线程处于阻塞状态。
优点:实现简单
缺点:无法满足高并发,高接入的需求
- 不使用线程池的BIO模型,除多路复用器java代码了无法满足高并发需求外,由于需要为每个请求创建一个线程,还可能因为接入大量不活跃连接而耗尽服务器资源。
- 使用线程池的BIO模型,虽然控制了线程数量,但由于其本质上读写仍是阻塞的,仍无法满足高并发需求。
#NIO---Non-Blocking IO(非阻塞IO)
##非阻塞IO和多路复用
非阻塞IO和多路复用实际上是两个不用的概念,由于两者通常结合在一起使用,因此两者往往被混为一谈。下面我将试着分清这两个概念:
###非阻塞IO
与BIO相对应,非阻塞IO的读写方法无论是否有数据都立即返回,因此可以通过轮询方式来实现,但轮询方式的效率并不比BIO有显著提高,因为每个连接仍然需要占用一个线程。下面是轮询方式实现的IO模式图:
###多路复用
- 多路复用结合非阻塞IO能够明显提高IO的效率,这也是Java1.4把非阻塞IO和多路复用同时发布的原因。
- 多路复用的核心是多路复用器(Selector),它是需要操作系统底层支持的,简单的说,就是进程把多个socket和它们关心的事件(比如连接请求或数据已准备好)都注册在多路复用器上,操作系统会在事件发生时通知多路复用器,这样进程就可以通过多路复用器知道在那个socket上发生了什么时间,从而进行对应的处理。
- 多路复用的优点在于只需要一个线程监测(阻塞或轮询方式均可)多路选择器的状态,只有在有事件需要发生时才会真正的创建线程进行处理,因此更适合高并发多接入的应用环境。
- 在Linux系统下,多路复用的底层实现是epoll方法,与select/poll的顺序扫描不同,epoll采用效率更高的事件驱动方式,而且epoll方式并没有socket个数限制。
##BIO和NIO的比较
- BIO适用于连接长期保持的应用,比如一个复杂系统中模块之间通过长连接来进行通信。
- NIO加多路复用的模式更适合短连接、高并发、多接入的情形,比如网络服务器。
##NIO网络编程的常用接口
##Reactor模式
Reactor模式用于解决事件分发处理的问题,Handler把自己的channel和关注的事件注册到Selector中,当对应的事件发生在自己的channel上时,对应的handler就会得到通知并进行处理。
- 单线程的Reactor
消息的分发、读写、处理都在一个线程中处理,是Reactor最简单的实现方式,如果消息的处理需要较长时间,会影响效率。
```java
//Reactor类,负责分发事件并调用对应的handler
class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
//Reactor初始化
Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false); //必须配置为非阻塞
//Acceptor会在Reactor初始化时就注册到Selector中,用于接受connect请求
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(new Acceptor()); //attach callback object, Acceptor
}
//分发消息并调用对应的handler
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext())
dispatch((SelectionKey)(it.next()); //Reactor负责dispatch收到的事件
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable)(k.attachment()); //调用之前注册的callback对象
if (r != null)
r.run();
}
//Acceptor也是一个handler,负责创建socket并把新建的socket也注册到selector中
class Acceptor implements Runnable { // inner
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null)
new Handler(selector, c);
}
catch(IOException ex) { /* ... */ }
}
}
}
//Concrete Handler:用于收发和处理消息。
//在当前的实现中,使用Runnable接口作为每个具体Handler的统一接口
//如果在处理时需要参数和返回值,也可以为Handler另外声明一个统一接口来代替Runnable接口
final class Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector sel, SocketChannel c) throws IOException {
socket = c; c.configureBlocking(false);
// Optionally try first read now
sk = socket.register(sel, 0);
sk.attach(this); //将Handler作为callback对象
sk.interestOps(SelectionKey.OP_READ); //第二步,接收Read事件
sel.wakeup();
}
boolean inputIsComplete() { /* ... */ }
boolean outputIsComplete() { /* ... */ }
void process() { /* ... */ }
public void run() {
try {
if (state == READING) read();
else if (state == SENDING) send();
} catch (IOException ex) { /* ... */ }
}
void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// Normally also do first write now
sk.interestOps(SelectionKey.OP_WRITE); //第三步,接收write事件
}
}
void send() throws IOException {
socket.write(output);
if (outputIsComplete()) sk.cancel(); //write完就结束了, 关闭select key
}
}
//上面 的实现用Handler来同时处理Read和Write事件, 所以里面出现状态判断
//我们可以用State-Object pattern来更优雅的实现
class Handler { // ...
public void run() { // initial state is reader
socket.read(input);
if (inputIsComplete()) {
process();
sk.attach(new Sender()); //状态迁移, Read后变成write, 用Sender作为新的callback对象
sk.interest(SelectionKey.OP_WRITE);
sk.selector().wakeup();
}
}
class Sender implements Runnable {
public void run(){ // ...
socket.write(output);
if (outputIsComplete()) sk.cancel();
}
}
}
```
- 多线程Reacotr
处理消息过程放在其多路复用器java代码他线程中执行
```java
class Handler implements Runnable {
// uses util.concurrent thread pool
static PooledExecutor pool = new PooledExecutor(...);
static final int PROCESSING = 3;
// ...
synchronized void read() { // ...
socket.read(input);
if (inputIsComplete()) {
state = PROCESSING;
pool.execute(new Processer()); //使用线程pool异步执行
}
}
synchronized void processAndHandOff() {
process();
state = SENDING; // or rebind attachment
sk.interest(SelectionKey.OP_WRITE); //process完,开始等待write事件
}
class Processer implements Runnable {
public void run() { processAndHandOff(); }
}
}
```
- 使用多个selector
mainReactor只负责处理accept并创建socket,多个subReactor负责处理读写请求
```java
Selector[] selectors; //subReactors集合, 一个selector代表一个subReactor
int next = 0;
class Acceptor { // ...
public synchronized void run() { ...
Socket connection = serverSocket.accept(); //主selector负责accept
if (connection != null)
new Handler(selectors[next], connection); //选个subReactor去负责接收到的connection
if (++next == selectors.length) next = 0;
}
}
```
#AIO
AIO是真正的异步IO,它于JDK1.7时引入,它和NIO的区别在于:
- NIO仍然需要一个线程阻塞在select方法上,AIO则不需要
- NIO得到数据准备好的消息以后,仍然需要自己把消息复制到用户空间,AIO则是通过操作系统的支持把数据异步复制到用户空间以后再给应用进程发出信号。
IO多路复用
阻塞IO只能阻塞一个IO操作,IO复用模型能阻塞多个IO操作,所以才叫多路复用
读数据 :
直到数据全拷贝至User Space后才返回
不断去Kernel做Polling,询问比如读操作是否完成,没完成则read()操作会返回EWOUDBLOCK,需要过一会再尝试执行一次read()。该模式会消耗大量CPU
之前等待时间主要消耗在等数据到达上。IO Multiplexing则是将等待数据到来和读取实际数据两个事情分开,好处是通过select()等IO Multiplexing的接口一次可以等待在多个Socket上。select()返回后,处于Ready状态的Socket执行读操作时候也会阻塞,只是只阻塞将数据从Kernel拷贝到User的时间
首先注册处理函数到SIGIO信号上,在等待数据到来过程结束后,系统触发SGIO信号,之后可以在信号处理函数中执行读数据操作,再唤醒Main Thread或直接唤醒Main Thread让它完成数据读取。整个过程没有一次阻塞。
问题:TCP下,连接断开/可读/可写等都会产生Signal,并且Signal没有提供好的方法去区分这些Signal到底为什么被触发
AIO是注册一个读任务,直到读任务完全完成后才会通知应用层。AIO是由内核通知IO操作什么时候完成,信号驱动IO是由内核告知何时启动IO操作
也存在挺多问题,比如如何去cancel一个读任务
除了AIO是异步IO,其他全是同步IO
fd_set: 一个long类型的数组,每一位可以表示一个文件描述符
问题 :
返回条件与select一样。
fds还是关注的描述符列表。poll将events和reevents分开了,所以如果关注的events没有发生变化就可以重用fds,poll只修改rents不会动events。fds是个数组,不是fds_set,没有了上限。
相对于select,poll解决了fds长度上限问题,解决了监听描述符无法复用问题,但仍需在poll返回后遍历fds去找ready的描述符,也要清理ready描述符对应的revents,Kernel也同样是每次poll调用需要去遍历fds注册监听,poll返回时拆除监听,也仍有惊群问题,无法动态修改描述符的问题。
使用步骤:
优点 :
缺点 :
changelist用于传递关心的event
nchanges用于传递changelist的大小
eventlist用于当有事件产生后,将产生的事件放在这里
nevents用于传递eventlist大小
timeout 超时时间
kqueue高级的地方在于,它监听的不一定非要是Socket,不一定非要是文件,可以是一系列事件,所以struct kevent内参数叫filter,用于过滤出关心的事件。
kqueue有epoll所有优点,还能通过changelist一次注册多个关心的event,不需要像epoll那样每次调用epoll_ctl去配置
当我们执行epoll_ctl时,除了把socket放到epoll文件系统里file对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个句柄的中断到了,就把它放到准备就绪list链表里。所以,当一个socket上有数据到了,内核在把网卡上的数据copy到内核中后就来把socket插入到准备就绪链表里。
如此,一棵红黑树,一张准备就绪句柄链表,少量的内核cache,就帮我们解决了大并发下的socket处理问题。执行epoll_create时,创建了红黑树和就绪链表,执行epoll_ctl时,如果增加socket句柄,则检查在红黑树中是否存在,存在立即返回,不存在则添加到树干上,然后向内核注册回调函数,用于当中断事件来临时向准备就绪链表中插入数据。执行epoll_wait时立刻返回准备就绪链表里的数据即可。
Epoll有两种触发模式,一种Edge Trigger简称ET,一种Level Trigger简称LT。每个使用epoll_ctl注册在epoll描述符上的被监听的描述符都能单独配置自己的触发模式。
从使用角度的区别:ET模式下当一个文件描述符Ready后,需要以Non-Blocking方式一直操作这个FD直到操作返回EAGAIN错误位置,期间Ready这个事件只会触发epoll_wait一次返回。LT模式,如果FD上的事件一直处在Ready状态没处理完,则每次调用epoll_wait都会立即返回
场景:
Java的NIO提供了Selector类,用于跨平台的实现Socket Polling,即IO多路复用。BSD系统上对应的是Kqueue,Window上对应的是Select,Linux上对应的是LT的Epoll(为了跨平台统一,Windows上背后是Select,是LT的)
Selector的使用:
多路复用器java代码的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于多路复用器java代码怎么写、多路复用器java代码的信息别忘了在本站进行查找喔。