Netty学习:二、核心组件

298次阅读
没有评论

共计 5063 个字符,预计需要花费 13 分钟才能阅读完成。

内容目录

Netty核心组件

  Netty核心组件包括以下内容:

  • Channel:Netty中的基本数据传输抽象,用于网络I/O操作。它代表一个打开的连接,可以是网络套接字、文件或其它能够进行I/O操作的实体;
  • EventLoop: Netty中的事件处理循环,负责处理与Channel相关的I/O操作,每个EventLoop在自己的线程中运行,并服务于一个或多个Channel。
  • ChannelPipeline: Channel中的拦截器链,包含处理Channel中数据流的多个处理器Handler。当Channel中有数据进行读写时,这些数据就会经过Pipeline 中的各个处理器进行处理和传递。
  • ChannelHandler: 用来处理或拦截Channel中I/O事件或操作的接口。它是ChannelPipeline链上的每个节点。
  • ChannelFuture: Netty中异步操作后的结果抽象,表示一个尚未完成的I/O操作,可以通过ChannelFuture的回调机制来处理操作完成时的行为。
  • Bootstrap: Netty中的引导程序,负责组装上面的这些组件进行配置,然后启动网络程序。
  • ByteBuf:Netty中用于处理数据缓冲的抽象。网络的I/O数据都是可以用该抽象获取。

Netty学习:二、核心组件

4.1 Channel

  Channel是Java NIO/Netty的一个基础结构。它代表一个到实体的开放连接,允许我们从连接中执行读操作和写操作。可以简单的将Channel看作是传入或传出数据的载体。因此它可以被打开,也可以被关闭。

  在Netty中,常见的Channel有以下几个:

  • NioSocketChannel: 异步的TCP客户端;
  • NioServerSocketChannel: 异步TCP服务端;
  • NioDatagramChannel: 异步的UDP连接;
  • OioSocketChannel:同步的TCP客户端;
  • OioServerSocketChannel: 同步的TCP服务端;
  • OioDatagamChannel: 同步的UDP连接
  • EpollSocketChannel: 基于Linux上的Epoll I/O机制的Socket客户端通道,比NIO性能更高,只在Linux系统上可用;
  • EpollServerSocketChannel: 上同,服务端通道。

4.2 EventLoop

  EventLoop是Netty中非常核心的概念,它是一个主要用于处理I/O操作的组件,负责接收和处理事件。在EventLoop中,实现了事件处理的循环,并为每个Channel处理相关的I/O操作,包括读、写和连接事件。

  一般情况下,每个EventLoop关联着一个或多个Channel,并且是在单独的线程中运行,这样可以确保同一个Channel的事件不会并发处理,从而保证了线程安全

4.2.1 EventLoopGroup

  EventLoopGroup:事件循环组,通常用于管理多个EventLoop,在创建Server或Client时,需要指定一个EventLoopGroup来处理所有的I/O操作。常见的EventLoopGroup有:

  • NioEventLoopGroup: 基于NIO的实现,适合TCP和UDP连接;
  • EpollEventLoopGroup: 基于Linux的Epoll I/O模型,适用于高性能的服务器应用,仅Linux上使用。

  如果我们想定时执行一下任务时,可以调用schedule()方法,如果想执行非定时任务时,可以调用execute()方法。

Netty学习:二、核心组件

4.2.2 主从多线程事件循环驱动组:

  在主从多线程模型中,可能还有BossWorker两类线程组:

  • Boss线程组:主要负责监听新的连接,接收新的连接请求,创建新的Channel,然后将Channel分配给Worker线程组。使用时,一般线程数设置为1或2。
  • Worker线程组: 主要负责处理已建立好的Channel的I/O操作,包括读取数据、写入数据等。使用时,需要根据具体需要设置值以提高并发处理能力。

4.3 ChannelPipeline

ChannelPipeline一般与Channel进行关联,一个Channel对应一个ChannelPipeline。负责处理Channel中的所有I/O操作和事件。ChannelPipeline通过将一系列的处理器(Handler)串联起来组成一条处理链,当数据流入时,会按照处理链中的Handler的顺序进行处理。所以,ChannelPipeline也是ChannelHandler的容器,内部包含由ChannelHandler组成的一条流水处理线。

Netty学习:二、核心组件

4.4 ChannelHandler

  ChannelHandler用于处理不同类型的I/O事件和操作。这也是我们使用Netty时要实现具体业务的接口。在Netty中,ChannelHandler主要又分为两类:

Netty学习:二、核心组件

  • ChannelInboundHandler: 用于处理接收到的数据(入站数据),例如一些数据解码,业务逻辑处理等;
  • ChannelOutboundHandler: 用于处理发送的数据(出站数据),例如一些数据编码,日志业务等;

Netty学习:二、核心组件

  常用的一些ChannelHandler:

  • 编码器/解码器:
    • StringDecoder:将接收到的字节流转换为字符串(入站)。
    • StringEncoder:将字符串转换为字节流(出站)。
    • ByteArrayDecoder:将接收到的字节流解码为字节数组(入站)。
    • ByteArrayEncoder:将字节数组编码为字节流(出站)。
  • 业务逻辑处理器:
    • SimpleChannelInboundHandler:一个方便的抽象类,简化入站处理的实现。可以直接处理入站的消息类型,并自动调用 ctx.fireChannelRead() 方法。
  • 日志记录:
    • LoggingHandler:用于记录所有的入站和出站事件,适合用于调试和监控
  • 管道和代理:
    • ChannelInitializer:通常用于设置新的 ChannelChannelPipeline 的处理器。在服务器中,常用来配置每个连接的处理逻辑。
  • HTTP 相关处理器:
    • HttpObjectAggregator:将 HTTP 消息聚合成完整的请求或响应。适合处理 HTTP POST 请求的情况。
    • HttpRequestDecoder:将字节流解码为 HTTP 请求(入站)。
    • HttpResponseEncoder:将 HTTP 响应编码为字节流(出站)。
  • WebSocket 相关处理器
    • WebSocketServerProtocolHandler:用于处理 WebSocket 连接的握手和消息处理。

4.5 ChannelFuture

  ChannelFuture在Netty中是一个用于表示异步的I/O操作结果的接口。我们可以使用它来处理异步操作的结果,包括判断是否成功,是否出现异常等信息。
主要方法如下:

  • isDone():检查操作是否已经完成(无论是成功还是失败)。
  • isSuccess():检查操作是否成功。
  • isCancelled():检查操作是否被取消。
  • get():阻塞等待操作完成,并返回结果。
  • cause():获取导致操作失败的异常。
  • addListener(ChannelFutureListener listener):注册一个监听器,在操作完成时被通知。
  • sync():阻塞当前线程,直到操作完成。
  • syncUninterruptibly():阻塞当前线程,即使当前线程被中断也不会中断,直到操作完成。

4.6 Bootstrap

  Bootstrap是一个用于配置和启动网络应用程序的一个辅助类。它提供了一种简单的方式来创建和配置网络连接。在Netty中,Bootstrap分为两类:

  • Bootstrap: 用于客户端应用程序;
  • ServerBootstrap: 用于服务端应用程序;

  使用Bootstrap可以方便我们配置Netty程序中Channel、EventLoop、ChannelHandler等。

4.7 ByteBuf

  ByteBuf时Netty中处理二进制数据的主要抽象,旨在替代Java NIO中标准的ByteBuffer。ByteBuf可以直接通过JVM堆内存和直接内存进行内存分配,有效提高I/O性能。同时还是有读索引和写索引来管理数据的读取和写入,有效避免内存复制,从而提高了性能。

  一般通过Unpooled来创建ByteBuf。Unpooled 是 Netty 提供的一个工具类,主要用于创建非池化的 ByteBuf 实例。Unpooled 可以方便地创建不同类型的 ByteBuf,并通常用于需要即时分配内存而不考虑性能优化的场景。非池化的含义就是创建出来的ByteBuf不会添加到Netty的内存池中。

总结

再回过头来看我们的EchoServer代码:

package me.pgthinker;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.CharsetUtil;

/**
 * @Project: me.pgthinker
 * @Author: pgthinker
 * @GitHub: https://github.com/ningning0111
 * @Date: 2024/9/1 14:27
 * @Description:
 */
public class EchoServer {
    private final static Integer port = 8778;

    public static void main(String[] args) throws InterruptedException {
        new ServerBootstrap()
                .group(new NioEventLoopGroup()) // 配置事件循环组
                .channel(NioServerSocketChannel.class) // 指定Channel类型
                .childHandler(new ChannelInitializer<SocketChannel>() { // 定义Channel的Pipeline中的Handler
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                    // 定义具体的Handler
                        socketChannel.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
                                ByteBuf resp = Unpooled.copiedBuffer("服务端返回的消息:" + byteBuf.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8);
                                channelHandlerContext.writeAndFlush(resp);
                            }

                            @Override
                            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                ctx.writeAndFlush(Unpooled.copiedBuffer("连接成功!\n", CharsetUtil.UTF_8));
                            }
                        });
                    }

                }).bind(port); // 绑定端口

    }

}

 其中的逻辑也就不再难以理解。

正文完
 
PG Thinker
版权声明:本站原创文章,由 PG Thinker 2024-09-01发表,共计5063字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)
热评文章
Rust 编译并使用 Protobuf

Rust 编译并使用 Protobuf

内容目录 Rust 编译并使用 Protobuf 必要的依赖库 prost: https://github.c...