共计 4156 个字符,预计需要花费 11 分钟才能阅读完成。
2.1 Java NIO介绍
在传统的Java I/O模型中,I/O操作是以BIO的方式进行的。这就意味着,当一个线程执行I/O操作时,它会被阻塞直到操作完成。这种阻塞模型在高并发连接时可能会导致性能瓶颈,因为需要为每个连接建立一个线程,而线程的创建和切换都是由开销的。
为了解决上述问题,自JDK 1.4(JDK 4)起引入了一种新的I/O模型-NIO。NIO弥补了BIO的不足,它在Java中提供了非阻塞、面向缓冲、基于通道的I/O,可以使用少量的线程来处理多个连接,大大提高了I/O效率和并发。
2.2 Java NIO核心三大组件
Java NIO中主要包括三个核心组件:
- Channel(通道):Channel是NIO中进行I/O操作的核心组件,它代表了一个连接到I/O设备的通道,Channel主要分为两类:
- FileChannel:用于文件IO操作的通道;
- SelectableChannel:用于网络IO操作的通道;
- Buffer(缓冲区):Buffer时NIO中用于存储数据的缓冲区,所有NIO的I/O操作都是通过Buffer进行的。Buffer可以存储不同类型的数据,例如byte、char、short、int、long、float、double。
- Selector(选择器):Selector是NIO中用于监视多个Channel事件的组件,它可以同时监视多个Channel的状态,并只处理那些已经就绪的Channel。Selector可以用于实现非阻塞I/O,从而提高应用程序的性能。
2.3 Java NIO的工作方式
Java NIO的工作方式就是通过核心三大组件实现:
- 应用程序通过Channel进行I/O操作;
- 数据通过Buffer进行存储;
- Selector监听多个Channel状态
2.4 Java NIO Buffer
Buffer是一个对象,它包含一些要写入或者要读出的数据。在NIO类库中加入Buffer对象,体现了新库与原I/O的一个重要区别。在面向流的I/O中,可以将数据直接卸乳或者将数据直接读到Stream对象中。
在NIO库中,所有数据都是用缓冲区(Buffer)处理的。在读取数据时,它是直接读到缓冲区中的;在写入数据时,写入到缓冲区中。任何时候访问NIO中的数据都是通过缓冲区进行操作。
缓冲区本质上是一个数组,通常是字节数组ByteBuffer
,它提供了对数据的结构化访问以及维护读写位置等信息。
除了字节类型的缓冲区外,Java基本类型都有对应的一种缓冲区。
2.5 Java NIO Channel
Channel是一个通道,可以通过它读取和写入数据,它就像自来水管一样,网络数据通过Channel读取和写入。通道与流的不同之处在于通道是双向的,流只是在一个方向上移动,而且通道可以用于读、写或读写同时进行。
因为Channel是全双工的,所以它可以比流更好地映射底层操作系统的API。特别是在UNIX网络编程模型中,底层操作系统的通道都是全双工的,同时支持读写操作。
2.6 Java NIO Selector
Selector,多路复用器,是Java NIO编程的基础。Selector提供选择已经就绪的任务的能力,简单来讲:Selector会不断地轮询 注册在自己身上的Channel,如果某个Channel上面有新的TCP连接接入、读和写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。
一个Selector可以同时轮询多个Channel,由于Java NIO使用了epoll()
代替传统的select
实现,所以它并没有最大连接句柄的限制,这就意味着只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端。
Selector可以监听以下四种事件类型:
SelectionKey.OP_ACCEPT
:通道接受连接的事件,通常用于ServerSocketChannel
,即服务端;SelectionKey.OP_CONNECT
:通道完成连接的事件,通常用于SocketChannel
,即客户端;SelectionKey.OP_READ
:通道准备好进行读取的事件,即有数据可读。SelectionKey.OP_WRITE
:通道准备好进行写入的事件,即有数据可写入。
2.7 Java NIO 案例
本节将通过NIO实现一个简单的服务端案例,该服务端通过监听本地某端口,将所有连接到本端口的网络信息转发回去。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class DemoService {
private static final int DEFAULT_PORT = 8765;
public static void main(String[] args) {
int port = DEFAULT_PORT;
if(args != null && args.length > 0){
try {
port = Integer.valueOf(args[0]);
}catch (NumberFormatException e){
System.out.println("未指定端口,将采用默认端口["+DEFAULT_PORT+"]启动");
}
}
Service service = new Service(port);
new Thread(service).run();
}
private static class Service implements Runnable{
private boolean stop = false;
private Selector selector;
private ServerSocketChannel serverSocketChannel;
public Service(int port){
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(port));
// 注册连接事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务端已启动["+port+"]!");
}catch (IOException e){
e.printStackTrace();
System.exit(1);
}
}
@Override
public void run() {
while (!stop){
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
SelectionKey key = null;
while(it.hasNext()){
key = it.next();
it.remove();
try {
handle(key);
}catch (Exception e){
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
}catch (IOException e){
e.printStackTrace();
}
}
if(selector != null){
try {
selector.close();
}catch (Exception e){
e.printStackTrace();
}
}
}
// 处理轮询到的事件
private void handle(SelectionKey key) throws IOException {
// 处理监听事件
if(key.isAcceptable()){
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
System.out.println("连接成功");
sc.configureBlocking(false);
sc.register(selector,SelectionKey.OP_READ);
}
// 处理可读数据就绪事件 即客户端发来了信息
if(key.isReadable()){
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put(new String("服务端信息:").getBytes());
SocketChannel channel = (SocketChannel) key.channel();
int read = channel.read(buffer);
if(read > 0){
buffer.flip();
buffer.put(0,new String("服务端消息:").getBytes());
// 写入数据
channel.write(buffer);
buffer.clear();
}else if(read == -1){
channel.close();
}
}
}
public void stop(){
stop = true;
}
}
}
启动服务端,通过telnet localhost 8765
连接服务端,并发送信息进行测试: