共计 5063 个字符,预计需要花费 13 分钟才能阅读完成。
Netty核心组件
Netty核心组件包括以下内容:
- Channel:Netty中的基本数据传输抽象,用于网络I/O操作。它代表一个打开的连接,可以是网络套接字、文件或其它能够进行I/O操作的实体;
- EventLoop: Netty中的事件处理循环,负责处理与Channel相关的I/O操作,每个EventLoop在自己的线程中运行,并服务于一个或多个Channel。
- ChannelPipeline: Channel中的拦截器链,包含处理Channel中数据流的多个处理器
Handler
。当Channel中有数据进行读写时,这些数据就会经过Pipeline 中的各个处理器进行处理和传递。 - ChannelHandler: 用来处理或拦截Channel中I/O事件或操作的接口。它是ChannelPipeline链上的每个节点。
- ChannelFuture: Netty中异步操作后的结果抽象,表示一个尚未完成的I/O操作,可以通过ChannelFuture的回调机制来处理操作完成时的行为。
- Bootstrap: Netty中的引导程序,负责组装上面的这些组件进行配置,然后启动网络程序。
- ByteBuf:Netty中用于处理数据缓冲的抽象。网络的I/O数据都是可以用该抽象获取。
4.1 Channel
Channel是Java NIO/Netty的一个基础结构。它代表一个到实体的开放连接,允许我们从连接中执行读操作和写操作。可以简单的将Channel看作是传入或传出数据的载体。因此它可以被打开,也可以被关闭。
在Netty中,常见的Channel有以下几个:
- NioSocketChannel: 异步的TCP客户端;
- NioServerSocketChannel: 异步TCP服务端;
- NioDatagramChannel: 异步的UDP连接;
- OioSocketChannel:同步的TCP客户端;
- OioServerSocketChannel: 同步的TCP服务端;
- OioDatagamChannel: 同步的UDP连接
- EpollSocketChannel: 基于Linux上的Epoll I/O机制的Socket客户端通道,比NIO性能更高,只在Linux系统上可用;
- EpollServerSocketChannel: 上同,服务端通道。
4.2 EventLoop
EventLoop是Netty中非常核心的概念,它是一个主要用于处理I/O操作的组件,负责接收和处理事件。在EventLoop中,实现了事件处理的循环,并为每个Channel
处理相关的I/O操作,包括读、写和连接事件。
一般情况下,每个EventLoop关联着一个或多个Channel,并且是在单独的线程中运行,这样可以确保同一个Channel的事件不会并发处理,从而保证了线程安全。
4.2.1 EventLoopGroup
EventLoopGroup
:事件循环组,通常用于管理多个EventLoop,在创建Server或Client时,需要指定一个EventLoopGroup来处理所有的I/O操作。常见的EventLoopGroup有:
- NioEventLoopGroup: 基于NIO的实现,适合TCP和UDP连接;
- EpollEventLoopGroup: 基于Linux的Epoll I/O模型,适用于高性能的服务器应用,仅Linux上使用。
如果我们想定时执行一下任务时,可以调用schedule()
方法,如果想执行非定时任务时,可以调用execute()
方法。
4.2.2 主从多线程事件循环驱动组:
在主从多线程模型中,可能还有Boss
和Worker
两类线程组:
- Boss线程组:主要负责监听新的连接,接收新的连接请求,创建新的Channel,然后将Channel分配给Worker线程组。使用时,一般线程数设置为1或2。
- Worker线程组: 主要负责处理已建立好的Channel的I/O操作,包括读取数据、写入数据等。使用时,需要根据具体需要设置值以提高并发处理能力。
4.3 ChannelPipeline
ChannelPipeline一般与Channel进行关联,一个Channel对应一个ChannelPipeline。负责处理Channel中的所有I/O操作和事件。ChannelPipeline通过将一系列的处理器(Handler)串联起来组成一条处理链,当数据流入时,会按照处理链中的Handler的顺序进行处理。所以,ChannelPipeline也是ChannelHandler的容器,内部包含由ChannelHandler组成的一条流水处理线。
4.4 ChannelHandler
ChannelHandler用于处理不同类型的I/O事件和操作。这也是我们使用Netty时要实现具体业务的接口。在Netty中,ChannelHandler主要又分为两类:
- ChannelInboundHandler: 用于处理接收到的数据(入站数据),例如一些数据解码,业务逻辑处理等;
- ChannelOutboundHandler: 用于处理发送的数据(出站数据),例如一些数据编码,日志业务等;
常用的一些ChannelHandler:
- 编码器/解码器:
- StringDecoder:将接收到的字节流转换为字符串(入站)。
- StringEncoder:将字符串转换为字节流(出站)。
- ByteArrayDecoder:将接收到的字节流解码为字节数组(入站)。
- ByteArrayEncoder:将字节数组编码为字节流(出站)。
- 业务逻辑处理器:
- SimpleChannelInboundHandler:一个方便的抽象类,简化入站处理的实现。可以直接处理入站的消息类型,并自动调用
ctx.fireChannelRead()
方法。
- SimpleChannelInboundHandler:一个方便的抽象类,简化入站处理的实现。可以直接处理入站的消息类型,并自动调用
- 日志记录:
- LoggingHandler:用于记录所有的入站和出站事件,适合用于调试和监控
- 管道和代理:
- ChannelInitializer:通常用于设置新的
Channel
的ChannelPipeline
的处理器。在服务器中,常用来配置每个连接的处理逻辑。
- ChannelInitializer:通常用于设置新的
- HTTP 相关处理器:
- HttpObjectAggregator:将 HTTP 消息聚合成完整的请求或响应。适合处理 HTTP POST 请求的情况。
- HttpRequestDecoder:将字节流解码为 HTTP 请求(入站)。
- HttpResponseEncoder:将 HTTP 响应编码为字节流(出站)。
- WebSocket 相关处理器
- WebSocketServerProtocolHandler:用于处理 WebSocket 连接的握手和消息处理。
4.5 ChannelFuture
ChannelFuture在Netty中是一个用于表示异步的I/O操作结果的接口。我们可以使用它来处理异步操作的结果,包括判断是否成功,是否出现异常等信息。
主要方法如下:
- isDone():检查操作是否已经完成(无论是成功还是失败)。
- isSuccess():检查操作是否成功。
- isCancelled():检查操作是否被取消。
- get():阻塞等待操作完成,并返回结果。
- cause():获取导致操作失败的异常。
- addListener(ChannelFutureListener listener):注册一个监听器,在操作完成时被通知。
- sync():阻塞当前线程,直到操作完成。
- syncUninterruptibly():阻塞当前线程,即使当前线程被中断也不会中断,直到操作完成。
4.6 Bootstrap
Bootstrap是一个用于配置和启动网络应用程序的一个辅助类。它提供了一种简单的方式来创建和配置网络连接。在Netty中,Bootstrap分为两类:
- Bootstrap: 用于客户端应用程序;
- ServerBootstrap: 用于服务端应用程序;
使用Bootstrap可以方便我们配置Netty程序中Channel、EventLoop、ChannelHandler等。
4.7 ByteBuf
ByteBuf时Netty中处理二进制数据的主要抽象,旨在替代Java NIO中标准的ByteBuffer。ByteBuf可以直接通过JVM堆内存和直接内存进行内存分配,有效提高I/O性能。同时还是有读索引和写索引来管理数据的读取和写入,有效避免内存复制,从而提高了性能。
一般通过Unpooled
来创建ByteBuf。Unpooled
是 Netty 提供的一个工具类,主要用于创建非池化的 ByteBuf
实例。Unpooled
可以方便地创建不同类型的 ByteBuf
,并通常用于需要即时分配内存而不考虑性能优化的场景。非池化的含义就是创建出来的ByteBuf不会添加到Netty的内存池中。
总结
再回过头来看我们的EchoServer代码:
package me.pgthinker;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.CharsetUtil;
/**
* @Project: me.pgthinker
* @Author: pgthinker
* @GitHub: https://github.com/ningning0111
* @Date: 2024/9/1 14:27
* @Description:
*/
public class EchoServer {
private final static Integer port = 8778;
public static void main(String[] args) throws InterruptedException {
new ServerBootstrap()
.group(new NioEventLoopGroup()) // 配置事件循环组
.channel(NioServerSocketChannel.class) // 指定Channel类型
.childHandler(new ChannelInitializer<SocketChannel>() { // 定义Channel的Pipeline中的Handler
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 定义具体的Handler
socketChannel.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
ByteBuf resp = Unpooled.copiedBuffer("服务端返回的消息:" + byteBuf.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8);
channelHandlerContext.writeAndFlush(resp);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("连接成功!\n", CharsetUtil.UTF_8));
}
});
}
}).bind(port); // 绑定端口
}
}
其中的逻辑也就不再难以理解。