作者:京东物流 张弓言
Netty 是一款优秀的高性能网络框架,内部通过 NIO 的方式来处理网络请求,在高负载下也能可靠和高效地处理 I/O 操作
作为较底层的网络通信框架,其被广泛应用在各种中间件的开发中,比如 RPC框架、MQ、Elasticsearch等,这些中间件框架的底层网络通信模块大都利用到了 Netty 强大的网络抽象
下面这篇文章将主要对 Netty 中的各个组件进行分析,并在介绍完了各个组件之后,通过 JSF 这个 RPC 框架为例来分析 Netty 的使用,希望让大家对 Netty 能有一个清晰的了解
通过 Netty 来构建一个简易服务端是比较简单的,代码如下:
public class NettyServer {
public static final Logger LOGGER = LoggerFactory.getLogger(NettyServer.class);
public static void main(String[] args) {
ServerBootstrap serverBootstrap = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ChannelFuture channelFuture = serverBootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("Handler Added");
}
})
.childHandler(new ServerChannelInitializer())
.bind(8100);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
LOGGER.info("Netty Server Start !");
}
}
});
try {
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
上面代码的主要逻辑如下:
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
// 1. 启动类
ChannelFuture channelFuture = new Bootstrap()
// 2. 添加 EventLoop
.group(workGroup)
// 3. 选择客户端 channel 实现
.channel(NioSocketChannel.class)
// 4. 添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ZAS ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new StringEncoder());
}
})
// 5. 连接到服务器
.connect(new InetSocketAddress("localhost", 8100));
channelFuture.addListener(future -> {
if (future.isSuccess()) {
((ChannelFuture) future).channel().writeAndFlush("hello");
}
});
channelFuture.channel().closeFuture().sync();
} finally {
workGroup.shutdownGracefully();
}
}
}
上面代码的主要逻辑如下:
从上面的的两个例子可以看出,如果想通过 Netty 实现一个简易的服务器其实是非常简单的,只需要在启动引导类中设置好对应属性,然后完成端口绑定就可以实现。但也正是因为这种简易的实现方式,导致很多人在学习 Netty 的过程中,发现代码是写的出来,但是对内部的组件有什么作用以及为什么这么写可能就不是很清楚了,因此希望通过这一系列文章来加深大家对 Netty 的理解
Netty 中的 Channel 可以看成网络编程中的 Socket,其提供了一系列 IO 操作的 API,比如 read、write、bind、connect 等,大大降低了直接使用 Socket 类的复杂性
整体类继承关系如下:
从上面的继承关系可以看出,NioSocketChannel 和 NioServerSocketChannel 分别对应客户端和服务端的 Channel,两者的直接父类不一致,因此对外提供的功能也是不相同的。比如当发生 read 事件时,NioServerSocketChannel 的主要逻辑就是建立新的连接,而 NioSocketChannel 则是读取传输的字节进行业务处理
下面就以 NioServerSocketChannel 为例,带大家了解下该类的初始化过程,整体流程如下:
在上面的服务端启动过程中,ServerBootstrap 调用 channel() 方法并传入 NioServerSocketChannel,其底层代码逻辑为:
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
// ReflectiveChannelFactory 构造方法
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
整体逻辑很简单,通过传入的 Class 对象指定一个 Channel 反射工厂,后续调用工厂方法获取指定类型的 Channel 对象
当服务端启动引导类 ServerBootstrap 调用 bind() 方法之后,内部会走到 Channel 的实例化过程,代码精简如下:
// channel 初始化流程,内部通过 channelFactory 构造
final ChannelFuture initAndRegister() {
channel = channelFactory.newChannel();
}
// channelFactory 的 newChannel 方法逻辑
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
ChannelFactory 的整体逻辑就是通过反射的方式新建 Channel 对象,而 Channel 对象的类型则是在启动引导类中通过 channel() 方法进行指定的
在实例化 Channel 的过程中,会对其内部的一些属性进行初始化,而对这些属性的了解,可以使我们对 Netty 中各个组件的作用范围有一个更加清晰的理解,下面看下 NioServerSocketChannel 的构造函数源码
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
上述源码就是一层一层的父类构造,可以对照前面的类关系图进行阅读
NioServerSocketChannel 实例化过程中主要完成了以下内部属性的初始化:
对于 Channel 的实例化流程可以总结如下:
ChannelPipeline 也是 Netty 中的一个比较重要的组件,从上面的 Channel 实例化过程可以看出,每一个 Channel 实例中都会包含一个对应的 ChannelPipeline 属性
ChannelPipeline 底层初始化源码:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
从 ChannelPipeline 的构造函数可以看出,每一个 ChannelPipeline 底层都是一个双向链表结构,默认会包含 head 和 tail 头尾节点,用来进行一些默认的逻辑处理,处理细节会在后续文章中展现
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
// 回调 ChannelHandler 中的 handlerAdded() 方法
callHandlerAdded0(newCtx);
return this;
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
addLast() 方法是向 ChannelPipeline 中添加 ChannelHandler 用来进行业务处理
整个方法的逻辑为:
Channel、ChannelPipeline和 ChannelHandler 三者的关系如图所示:
Netty 正是通过 ChannelPipeline 这一结构为用户提供了自定义业务逻辑的扩展点,用户只需要向 ChannelPipeline 中添加处理对应业务逻辑的 ChannelHandler,之后当指定事件发生时,该 ChannelHandler 中的对应方法就会进行回调,实现业务的处理
ChannelHandler 是 Netty 中业务处理的核心类,当有 IO 事件发生时,该事件会在 ChannelPipeline 中进行传播,并依次调用到 ChannelHandler 中的指定方法
ChannelHandler 的类继承关系如下:
从上面的类继承关系可以看出,ChannelHandler 大致可以分为 ChannelInboundHandler 和 ChannelOutboundHandler,分别用来处理读、写事件
public interface ChannelInboundHandler extends ChannelHandler {
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
void channelActive(ChannelHandlerContext ctx) throws Exception;
void channelInactive(ChannelHandlerContext ctx) throws Exception;
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
在 ChannelInboundHandler 中定义了一系列的回调方法,用户可以实现该接口并重写相应的方法来自定义的业务逻辑。
重写方法逻辑是简单的,但很多人其实不清楚的是这些回调方法到底在什么场景下会被调用,如何调用,只有了解了这些回调方法的调用时机,才能在更适宜的地方完成相应功能
channelRegistered() 从方法名理解是当 Channel 完成注册之后会被调用,那么何为 Channel 注册?
下面就以 Netty 服务端启动过程中的部分源码为例(详细源码分析会在后续文章中),看下 channelRegistered() 的调用时机
在 Netty 服务端启动时,会调用到 io.netty.channel.AbstractChannel.AbstractUnsafe#register 方法,精简代码如下:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
private void register0(ChannelPromise promise) {
try {
// neverRegistered 初始值为 true
boolean firstRegistration = neverRegistered;
// 将 Channel 绑定到对应 eventLoop 中的 Selector 上
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
// 调用 ChannelHandler 中的 ChannelRegistered()
pipeline.fireChannelRegistered();
}
}
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
从 Netty 底层的 register() 方法可以看出,ChannelHandler 中的 ChannelRegistered() 调用时机是在调用 pipeline.fireChannelRegistered() 时触发的,此时已经完成的逻辑为:
因此当 Channel 和对应的 Selector 完成了绑定,Channel 中 pipeline 上绑定的 ChannelHandler 的channelRegisted() 方法就会进行回调
上面已经分析了channelRegistered() 方法的调用时机,也就是当 Channel 绑定到了对应 Selector 上之后就会进行回调,下面开始分析 channelActive() 方法的调用时机
对于服务端 Channel,前面还只是将 Channel 注册到了 Selector 上,还没有调用到 bind() 方法完成真正的底层端口绑定,那么有没有可能当服务端 Channel 完成端口监听之后,就会调用到 channelActive() 方法呢?
下面继续分析,在上面完成了 Channel 和 Selector 的注册之后,Netty 服务端启动过程中会继续调用到 io.netty.channel.AbstractChannel.AbstractUnsafe#bind 逻辑:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
在该方法中完成了以下逻辑:
总结:
当 Channel 调用了 bind() 方法完成端口绑定之后,channelActive() 方法会进行回调
该方法的调用时机,服务端和客户端是不一致的
服务端 Channel 绑定到 Selector 上时监听的是 Accept 事件,当客户端有新连接接入时,会回调 channelRead() 方法,完成新连接的接入
Netty 在服务端启动过程中,会默认添加一个 ChannelHandler io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor 来处理新连接的接入
当服务端处理完 Accept 事件后,会生成一个和客户端通信的 Channel,该 Channel 也会注册到对应的 Selector 上,并监听 read 事件
当客户端向该 Channel 中发送数据时就会触发 read 事件,调用到 channelRead() 方法(Netty 内部的源码处理会在后续的文章中进行分析)
当前 ChannelHandler 中各回调方法处理过程中如果发生了异常就会回调该方法