发布于2021-05-29 20:35 阅读(828) 评论(0) 点赞(30) 收藏(5)
【对话写 Netty 代码的同学,你真的懂 Netty 了吗?(一)】Netty 初始化总览
【对话写 Netty 代码的同学,你真的懂 Netty 了吗?(二)】主线流程 new NioEventLoopGroup(nThreads) 究竟做了什么
【对话写 Netty 代码的同学,你真的懂 Netty 了吗?(三上)】initAndRegister() 之 init() 怒了!这文章为啥写成了这样??
【对话写 Netty 代码的同学,你真的懂 Netty 了吗?(三中)】initAndRegister() 之 init() 对不起!!我柜子动了…我不学了
在开始本篇之前,我们先整体回顾一下上四篇文章我们究竟说了些什么。温故而知新。
我们主要围绕以下三行代码进行了原理的跟踪,探秘一行方法的背后究竟做了什么。
new NioEventLoopGroup(1);
ServerBootstrap bootstrap = new ServerBootstrap();
ChannelFuture cf = bootstrap.bind(9000).sync();
系列第三部分 上中篇 主要跟进了 bootstrap.bind(9000).sync();
中 initAndRegister()
方法的 init()
方法,并拓展了其中 Netty 自定义的任务提交执行逻辑
其中 initAndRegister()
方法主要分为 三 部分的主逻辑
channel = channelFactory.newChannel();
init(channel);
ChannelFuture regFuture = config().group().register(channel);
本篇将跟进 initAndRegister
最后一部分主逻辑 register()
让我们聚焦于 register
这一行代码
ChannelFuture regFuture = config().group().register(channel);
在跟 register()
之前我们需要首先确定 config().group()
拿到的是什么,其调用的是抽象父类AbstractBootstrapConfig
的 group()
方法,这里最终 return 了 group 。跟过之前文章的同学应该了解,这个 group 是初始化时传入的 EventLoopGroup bossGroup = new NioEventLoopGroup(1);
即我们的 bossGroup
public abstract class AbstractBootstrapConfig
public final EventLoopGroup group() {
return bootstrap.group();
}
public abstract class AbstractBootstrap
@Deprecated
public final EventLoopGroup group() {
return group;
}
这里调用的是 public interface EventLoopGroup
的实现类 public abstract class MultithreadEventLoopGroup
的实现方法
public interface EventLoopGroup extends EventExecutorGroup
@Override
public ChannelFuture register(ChannelPromise promise) {
return next().register(promise);
}
原因是:
public class NioEventLoopGroup extends MultithreadEventLoopGroup
public final class NioEventLoop extends SingleThreadEventLoop
记住这两个继承关系。
这里的 next()
方法调用的是 PowerOfTwoEventExecutorChooser
的 next()
方法,最终拿到的是 bossGroup 中的 NioEventLoop
,这里绕了一圈最终调用的是 SingleThreadEventLoop
的 register 实现
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
跟踪到 AbstractUnsafe
@Override
public final void register(EventLoop eventLoop, final ChannelProm
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loo
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration t
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
这里最核心的逻辑为 register0(promise)
以及 eventLoop.execute
。通过 debug 看这里的逻辑其实并不会走到 else 逻辑即不会运行到 eventLoop.execute
方法,这里埋个伏笔,既然这里没有进行 execute 那么代表并没有开启 select 的监听,唯一还有机会的地方是在 init 方法中添加到 pipline 的 ChannelInitializer ,那么何时调用了呢?让我们往下看
eventLoop.execute
在 【对话写 Netty 代码的同学,你真的懂 Netty 了吗?(三中)】initAndRegister() 之 init() 对不起!!我柜子动了…我不学了 有详细的分析,让我们聚焦于 if 中的 register0(promise)
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
这里重点关注一下几行代码:
doRegister();
pipeline.invokeHandlerAddedIfNeeded();
pipeline.fireChannelRegistered();
doRegister()
方法 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
其实现就是 Nio 代码中的注册事件的逻辑,但注意这里传入的 ops 是 0 ,在这里是什么都不会做的,在后续会调用 AbstractNioCHannel
的 doBeginRead()
中进行修正。
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
修正代码:
selectionKey.interestOps(interestOps | readInterestOp);
PS:在我看来这种设计并不优美……。
这里贴上 github 上对于此问题的 issues 大家可以参考一下:
https://github.com/netty/netty/issues/1836
Trustin Lee 是这么回答的:
It’s an intentional stuff to deal with a potential JDK bug where it returns a selection key with readyOps set to 0 for no good reason, leading to a spin loop. So, I’d leave it as it is.
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
首先简单翻译一下这个方法名: ”调用需要添加的句柄“
在通知 promise 之前确保我们调用了 handlerAdded()
方法,这是必需的,因为用户可能已经通过 ChannelFutureListener 中的管道触发事件。
简单理解 promise 是 Netty 对 Java 并发包 Future 接口的实现,同时在其基础上做了拓展,这里就不进行展开讲解了。
这里是 promise 是 register 调用链中 new 的一个 DefaultChannelPromise 传入的 channel 是 NioServerSocketChannel
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
继续跟踪 void invokeHandlerAddedIfNeeded()
看看这个方法究竟干了什么
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
// We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
// that were added before the registration was done.
callHandlerAddedForAllHandlers();
}
}
强调提醒一下,这里调用的是 DefaultChannelPipeline 的方法。也就是我们上文提到的 pipeline 。稍微回串一下之前的 init 代码。这里的 channel 是初始化传入的 NioServerSocketChannel.class
,是通过 channel = channelFactory.newChannel();
工厂创建出来的,pipline 则是在调用其构造方法时初始化的 pipeline = newChannelPipeline();
有些忘记的同学建议回顾一下前两篇
【对话写 Netty 代码的同学,你真的懂 Netty 了吗?(三上)】initAndRegister() 之 init() 怒了!这文章为啥写成了这样??
【对话写 Netty 代码的同学,你真的懂 Netty 了吗?(三中)】initAndRegister() 之 init() 对不起!!我柜子动了…我不学了
有了上述的回顾,再来看看他后面究竟做了什么。
注释是这样描述的:
现在我们已经注册到 EventLoop 。是时候回调所有已经在注册完成之前添加好的 ChannelHandler 了。
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
// This Channel itself was registered.
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// Null out so it can be GC'ed.
this.pendingHandlerCallbackHead = null;
}
// This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
// holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
// the EventLoop.
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
task.execute();
task = task.next;
}
}
通过 deubg 来看这里其实只有一个 task 执行
继续跟踪会最终调用到 ChannelInitializer
的 handlerAdded
方法
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
if (initChannel(ctx)) {
// We are done with init the Channel, removing the initializer now.
removeState(ctx);
}
}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
}
return true;
}
return false;
}
继续跟踪你会发现,最终这里会调用在之前 init 方法中重写的方法
整个方法又串了起来,在 init 方法中重写的 initChannel
方法最终在 register 时候进行了调用,且在最终调用完成后进行了移除
finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
}
官方也给出了相应的注释:
对于我们当前的 DefaultChannelPipeline 的实现,这里将总是 true
在 handlerAdded(…) 中调用 initChannel(…) 的好处是,如果 ChannelInitializer 将添加另一个 ChannelInitializer 将不会出现排序意外,这是因为所有处理程序都将按预期顺序添加。
我们已经完成了 Channel 的初始化工作,现在移除初始化程序。
总结一下: pipeline.invokeHandlerAddedIfNeeded
将会运行在 init 时添加到 pipline
中的 ChannelInitializer
的 initChannel(……)
方法,并执行了 ch.eventLoop().execute(……)
方法,伏笔消除,此时开启了循环监听,监听注册事件 。并且在初始化后,将进行移除。最后在 pipline 中添加了一个 ServerBootstrapAcceptor
调用 AbstractChannelHandlerContext.invokeChannelRegistered(head);
public class DefaultChannelPipeline implements ChannelPipeline {
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
@Override
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
}
这里整体上其实就是一个嵌套的循环调用由 invokeChannelRegistered(final AbstractChannelHandlerContext next)
到无参方法 void invokeChannelRegistered()
再到 ((ChannelInboundHandler) handler()).channelRegistered(this)
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
}
通过 debug 的方式可以跟踪到这里的 channelRegistered
调用的是 DefaultChannelPipeline
内部类 HeadContext
中的方法。
public class DefaultChannelPipeline implements ChannelPipeline {
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();
}
}
}
职责链的最后一块拼图(说实话这个设计真的很恶心……)ctx.fireChannelRegistered();
这行代码最终调用的是 AbstractChannelHandlerContext
中的实现 findContextInbound()
会按顺序找出 pipline 中的 inboundHandler
返回,依次循环
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
@Override
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(findContextInbound());
return this;
}
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
}
总结一下: pipeline.fireChannelRegistered()
这个方法会调用 pipline 中的所有 handler 的 channelRegistered(……)
方法。
那么此时 register 的逻辑就走完了,总结下大逻辑即:
doRegister(); // 注册 selectKey 将 OP_ACCEPT = 16 注册进来
pipeline.invokeHandlerAddedIfNeeded(); // 回调 init 中初始化的 ChannelInitializer,开启 select 循环监听,移除 pipline 中 初始化 ChannelInitializer 并添加 ServerBootstrapAcceptor
pipeline.fireChannelRegistered();// 调用 pipline 链路中所有 handler 的 channelRegistered 方法
到这里 initAndRegister()
逻辑终于走完。
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
最后一个核心 doBind0()
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise)
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
这里同样是一个套路,循环找到 outbound 调用其 invokeBind 方法
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}
会调用到 ChannelOutboundHandler 的 bind 方法那么这里只有一个 HeadContext ,所以只调用了 Head 的 bind 方法
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
}
那么最后的底层和 Nio 代码一样会调用系统的 native 方法。
至此 Server 端的 ChannelFuture cf = bootstrap.bind(9000).sync();
整体流程就走完了。
NettyServer 端的逻辑到此就告一段落,Client 端来势汹汹。注册监听搞定,那连接事件如何处理呢?个人定制的 Handler 究竟何时执行,希望你准备好迎接新的挑战了。
感谢你能看到这里,希望本文对你有所帮助,希望你在阅读的过程中也可以实战跟进源码内部,自己跟进才能有更好的理解。
我是 dying 搁浅 ,我始终期待与你的相遇。无论你是否期待,潮涨潮落,我仅且就在这里…
欢迎添加个人微信 dyinggq 一起交流学习~~
我们下期再见~
原文链接:https://blog.csdn.net/w903328615/article/details/116948452
作者:以天使的名义
链接:http://www.javaheidong.com/blog/article/207176/0e1dee879480f2fa0284/
来源:java黑洞网
任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任
昵称:
评论内容:(最多支持255个字符)
---无人问津也好,技不如人也罢,你都要试着安静下来,去做自己该做的事,而不是让内心的烦躁、焦虑,坏掉你本来就不多的热情和定力
Copyright © 2018-2021 java黑洞网 All Rights Reserved 版权所有,并保留所有权利。京ICP备18063182号-2
投诉与举报,广告合作请联系vgs_info@163.com或QQ3083709327
免责声明:网站文章均由用户上传,仅供读者学习交流使用,禁止用做商业用途。若文章涉及色情,反动,侵权等违法信息,请向我们举报,一经核实我们会立即删除!