Netty 线程模型是指 Netty 框架为了提供高性能、高并发的网络通信,而设计的管理和利用线程的策略和机制。
Netty 线程模型被称为 Reactor(响应式)模型/模式,它是基于 NIO 多路复用模型的一种升级,它的核心思想是将 IO 事件和业务处理进行分离,使用一个或多个线程来执行任务的一种机制。
Reactor 包含以下三大组件:
其中:
Reactor 模式支持以下三大模型:
具体内容如下。
在单线程模型中,所有的事件处理操作都由单个 Reactor 实例在单个线程下完成。Reactor 负责监控事件、分发事件和执行事件处理程序(Handlers),如下图所示:
单线程模型的实现 Demo 如下:
// 假设有一个单线程的Reactor,负责监听、接收连接、读写操作
class SingleThreadReactor {
EventLoop eventLoop; // 单个事件循环线程
SingleThreadReactor() {
eventLoop = new EventLoop(); // 初始化单个事件循环
}
void start(int port) {
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port)); // 绑定端口
eventLoop.execute(() -> { // 在事件循环中执行
while (true) {
SocketChannel clientSocket = serverSocket.accept(); // 接受连接
if (clientSocket != null) {
handleConnection(clientSocket); // 处理连接
}
}
});
eventLoop.run(); // 启动事件循环
}
void handleConnection(SocketChannel clientSocket) {
// 读写操作,这里简化处理
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (clientSocket.read(buffer) > 0) {
// 处理读取的数据
buffer.flip();
// 假设处理数据逻辑...
buffer.clear();
}
// 写操作逻辑类似
}
}
在多线程模型中,连接 Acceptor 和业务处理(Handlers)是由不同线程分开执行的,其中 Handlers 是由线程池(多个线程)来执行的,如下图所示:
多线程模型的实现 Demo 如下:
// 假设有两个线程,一个用于监听连接,一个用于处理连接后的操作
class MultiThreadReactor {
EventLoop acceptLoop;
EventLoop workerLoop;
MultiThreadReactor() {
acceptLoop = new EventLoop(); // 接收连接的线程
workerLoop = new EventLoop(); // 处理连接的线程
}
void start(int port) {
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
acceptLoop.execute(() -> { // 在接受线程中监听
while (true) {
SocketChannel clientSocket = serverSocket.accept();
if (clientSocket != null) {
workerLoop.execute(() -> handleConnection(clientSocket)); // 将新连接交给工作线程处理
}
}
});
acceptLoop.run(); // 启动接受线程
workerLoop.run(); // 启动工作线程
}
// handleConnection 方法与单线程模型中的相同
}
主从多线程模型是一个主 Reactor 线程加多个子 Reactor 子线程,以及多个工作线程池来处理业务的,如下图所示:
主从多线程模型的实现 Demo 如下:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class MainReactorModel {
public static void main(String[] args) {
// 主Reactor,用于接受连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 从Reactor,用于处理连接后的读写操作
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 在这里添加业务处理器,如解码器、编码器、业务逻辑处理器
ch.pipeline().addLast(new MyBusinessHandler());
}
});
ChannelFuture future = bootstrap.bind(8080).sync();
System.out.println("Server started at port 8080");
future.channel().closeFuture().sync(); // 等待服务器关闭
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
NioEventLoop 是如何实现的?它能够保证 Channel 操作的线程安全吗?为什么?
本文已收录到我的面试小站 www.javacn.site,其中包含的内容有:Redis、JVM、并发、并发、MySQL、Spring、Spring MVC、Spring Boot、Spring Cloud、MyBatis、设计模式、消息队列等模块。