程序员最近都爱上了这个网站  程序员们快来瞅瞅吧!  it98k网:it98k.com

本站消息

站长简介/公众号

  出租广告位,需要合作请联系站长


+关注
已关注

分类  

暂无分类

标签  

暂无标签

日期归档  

【对话写 Netty 代码的同学,你真的懂 Netty 了吗?(三下)】server register 将何去何从?

发布于2021-05-29 20:35     阅读(828)     评论(0)     点赞(30)     收藏(5)


目录:

【对话写 Netty 代码的同学,你真的懂 Netty 了吗?(一)】Netty 初始化总览

【对话写 Netty 代码的同学,你真的懂 Netty 了吗?(二)】主线流程 new NioEventLoopGroup(nThreads) 究竟做了什么

【对话写 Netty 代码的同学,你真的懂 Netty 了吗?(三上)】initAndRegister() 之 init() 怒了!这文章为啥写成了这样??

【对话写 Netty 代码的同学,你真的懂 Netty 了吗?(三中)】initAndRegister() 之 init() 对不起!!我柜子动了…我不学了

写在前面

在开始本篇之前,我们先整体回顾一下上四篇文章我们究竟说了些什么。温故而知新。

我们主要围绕以下三行代码进行了原理的跟踪,探秘一行方法的背后究竟做了什么。

new NioEventLoopGroup(1);
ServerBootstrap bootstrap = new ServerBootstrap();
ChannelFuture cf = bootstrap.bind(9000).sync();

系列第三部分 上中篇 主要跟进了 bootstrap.bind(9000).sync();initAndRegister() 方法的 init() 方法,并拓展了其中 Netty 自定义的任务提交执行逻辑

其中 initAndRegister() 方法主要分为 三 部分的主逻辑

 channel = channelFactory.newChannel();
 init(channel);
 ChannelFuture regFuture = config().group().register(channel);

本篇将跟进 initAndRegister 最后一部分主逻辑 register()

initAndRegister 之 register()

让我们聚焦于 register 这一行代码

ChannelFuture regFuture = config().group().register(channel);

在跟 register() 之前我们需要首先确定 config().group() 拿到的是什么,其调用的是抽象父类AbstractBootstrapConfiggroup() 方法,这里最终 return 了 group 。跟过之前文章的同学应该了解,这个 group 是初始化时传入的 EventLoopGroup bossGroup = new NioEventLoopGroup(1); 即我们的 bossGroup

public abstract class AbstractBootstrapConfig
public final EventLoopGroup group() {
        return bootstrap.group();
}

public abstract class AbstractBootstrap
@Deprecated
public final EventLoopGroup group() {
     		return group;
}

这里调用的是 public interface EventLoopGroup 的实现类 public abstract class MultithreadEventLoopGroup 的实现方法

public interface EventLoopGroup extends EventExecutorGroup
@Override
public ChannelFuture register(ChannelPromise promise) {
     return next().register(promise);
}

原因是:

public class NioEventLoopGroup extends MultithreadEventLoopGroup
public final class NioEventLoop extends SingleThreadEventLoop

记住这两个继承关系。

这里的 next() 方法调用的是 PowerOfTwoEventExecutorChoosernext() 方法,最终拿到的是 bossGroup 中的 NioEventLoop ,这里绕了一圈最终调用的是 SingleThreadEventLoop 的 register 实现

@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

跟踪到 AbstractUnsafe

@Override
public final void register(EventLoop eventLoop, final ChannelProm
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered 
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loo
        return;
    }
    AbstractChannel.this.eventLoop = eventLoop;
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration t
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

这里最核心的逻辑为 register0(promise) 以及 eventLoop.execute 。通过 debug 看这里的逻辑其实并不会走到 else 逻辑即不会运行到 eventLoop.execute 方法,这里埋个伏笔,既然这里没有进行 execute 那么代表并没有开启 select 的监听,唯一还有机会的地方是在 init 方法中添加到 pipline 的 ChannelInitializer ,那么何时调用了呢?让我们往下看

eventLoop.execute【对话写 Netty 代码的同学,你真的懂 Netty 了吗?(三中)】initAndRegister() 之 init() 对不起!!我柜子动了…我不学了 有详细的分析,让我们聚焦于 if 中的 register0(promise)

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;
        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded();
        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

这里重点关注一下几行代码:

doRegister();
pipeline.invokeHandlerAddedIfNeeded();
pipeline.fireChannelRegistered();

1.doRegister()

doRegister() 方法 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); 其实现就是 Nio 代码中的注册事件的逻辑,但注意这里传入的 ops 是 0 ,在这里是什么都不会做的,在后续会调用 AbstractNioCHanneldoBeginRead() 中进行修正。

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }
    readPending = true;
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

修正代码:

selectionKey.interestOps(interestOps | readInterestOp);

PS:在我看来这种设计并不优美……。

这里贴上 github 上对于此问题的 issues 大家可以参考一下:

https://github.com/netty/netty/issues/1836

Trustin Lee 是这么回答的:

It’s an intentional stuff to deal with a potential JDK bug where it returns a selection key with readyOps set to 0 for no good reason, leading to a spin loop. So, I’d leave it as it is.

2.pipeline.invokeHandlerAddedIfNeeded()

// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();

首先简单翻译一下这个方法名: ”调用需要添加的句柄“

在通知 promise 之前确保我们调用了 handlerAdded() 方法,这是必需的,因为用户可能已经通过 ChannelFutureListener 中的管道触发事件。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QLjwJwqD-1622125408984)(/Users/admin/Library/Application Support/typora-user-images/image-20210526214350525.png)]

简单理解 promise 是 Netty 对 Java 并发包 Future 接口的实现,同时在其基础上做了拓展,这里就不进行展开讲解了。

这里是 promise 是 register 调用链中 new 的一个 DefaultChannelPromise 传入的 channel 是 NioServerSocketChannel

    @Override
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }

继续跟踪 void invokeHandlerAddedIfNeeded() 看看这个方法究竟干了什么

final void invokeHandlerAddedIfNeeded() {
    assert channel.eventLoop().inEventLoop();
    if (firstRegistration) {
        firstRegistration = false;
        // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
        // that were added before the registration was done.
        callHandlerAddedForAllHandlers();
    }
}

强调提醒一下,这里调用的是 DefaultChannelPipeline 的方法。也就是我们上文提到的 pipeline 。稍微回串一下之前的 init 代码。这里的 channel 是初始化传入的 NioServerSocketChannel.class ,是通过 channel = channelFactory.newChannel(); 工厂创建出来的,pipline 则是在调用其构造方法时初始化的 pipeline = newChannelPipeline(); 有些忘记的同学建议回顾一下前两篇

【对话写 Netty 代码的同学,你真的懂 Netty 了吗?(三上)】initAndRegister() 之 init() 怒了!这文章为啥写成了这样??

【对话写 Netty 代码的同学,你真的懂 Netty 了吗?(三中)】initAndRegister() 之 init() 对不起!!我柜子动了…我不学了

在这里插入图片描述

有了上述的回顾,再来看看他后面究竟做了什么。

注释是这样描述的:

现在我们已经注册到 EventLoop 。是时候回调所有已经在注册完成之前添加好的 ChannelHandler 了。

private void callHandlerAddedForAllHandlers() {
    final PendingHandlerCallback pendingHandlerCallbackHead;
    synchronized (this) {
        assert !registered;
        // This Channel itself was registered.
        registered = true;
        pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
        // Null out so it can be GC'ed.
        this.pendingHandlerCallbackHead = null;
    }
    // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
    // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
    // the EventLoop.
    PendingHandlerCallback task = pendingHandlerCallbackHead;
    while (task != null) {
        task.execute();
        task = task.next;
    }
}

通过 deubg 来看这里其实只有一个 task 执行

在这里插入图片描述

继续跟踪会最终调用到 ChannelInitializerhandlerAdded 方法

在这里插入图片描述

 @Override
 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
     if (ctx.channel().isRegistered()) {
         // This should always be true with our current DefaultChannelPipeline implementation.
         // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
         // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
         // will be added in the expected order.
         if (initChannel(ctx)) {
             // We are done with init the Channel, removing the initializer now.
             removeState(ctx);
         }
     }
 }
 private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
     if (initMap.add(ctx)) { // Guard against re-entrance.
         try {
             initChannel((C) ctx.channel());
         } catch (Throwable cause) {
             // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
             // We do so to prevent multiple calls to initChannel(...).
             exceptionCaught(ctx, cause);
         } finally {
             ChannelPipeline pipeline = ctx.pipeline();
             if (pipeline.context(this) != null) {
                 pipeline.remove(this);
             }
         }
         return true;
     }
     return false;
 }

继续跟踪你会发现,最终这里会调用在之前 init 方法中重写的方法

在这里插入图片描述

整个方法又串了起来,在 init 方法中重写的 initChannel方法最终在 register 时候进行了调用,且在最终调用完成后进行了移除

finally {
             ChannelPipeline pipeline = ctx.pipeline();
             if (pipeline.context(this) != null) {
                 pipeline.remove(this);
             }
         }

官方也给出了相应的注释:

对于我们当前的 DefaultChannelPipeline 的实现,这里将总是 true

在 handlerAdded(…) 中调用 initChannel(…) 的好处是,如果 ChannelInitializer 将添加另一个 ChannelInitializer 将不会出现排序意外,这是因为所有处理程序都将按预期顺序添加。

我们已经完成了 Channel 的初始化工作,现在移除初始化程序。

总结一下pipeline.invokeHandlerAddedIfNeeded 将会运行在 init 时添加到 pipline 中的 ChannelInitializerinitChannel(……)方法,并执行了 ch.eventLoop().execute(……)方法,伏笔消除,此时开启了循环监听,监听注册事件 。并且在初始化后,将进行移除。最后在 pipline 中添加了一个 ServerBootstrapAcceptor

3.pipeline.fireChannelRegistered()

调用 AbstractChannelHandlerContext.invokeChannelRegistered(head);

public class DefaultChannelPipeline implements ChannelPipeline {
    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
    @Override
    public final ChannelPipeline fireChannelRegistered() {
        AbstractChannelHandlerContext.invokeChannelRegistered(head);
        return this;
    }
}

这里整体上其实就是一个嵌套的循环调用由 invokeChannelRegistered(final AbstractChannelHandlerContext next) 到无参方法 void invokeChannelRegistered() 再到 ((ChannelInboundHandler) handler()).channelRegistered(this)

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {
      
  	static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
    }
  
    private void invokeChannelRegistered() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRegistered(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRegistered();
        }
    }
}

通过 debug 的方式可以跟踪到这里的 channelRegistered 调用的是 DefaultChannelPipeline 内部类 HeadContext 中的方法。

public class DefaultChannelPipeline implements ChannelPipeline {
    
  final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

				@Override
        public void channelRegistered(ChannelHandlerContext ctx) {
            invokeHandlerAddedIfNeeded();
            ctx.fireChannelRegistered();
        }
  }
}

职责链的最后一块拼图(说实话这个设计真的很恶心……)ctx.fireChannelRegistered(); 这行代码最终调用的是 AbstractChannelHandlerContext 中的实现 findContextInbound() 会按顺序找出 pipline 中的 inboundHandler 返回,依次循环

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {
    
    @Override
    public ChannelHandlerContext fireChannelRegistered() {
        invokeChannelRegistered(findContextInbound());
        return this;
    }
  
    private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }
	
}

总结一下: pipeline.fireChannelRegistered() 这个方法会调用 pipline 中的所有 handler 的 channelRegistered(……) 方法。

register 总结

那么此时 register 的逻辑就走完了,总结下大逻辑即:

doRegister(); // 注册 selectKey 将 OP_ACCEPT = 16 注册进来
pipeline.invokeHandlerAddedIfNeeded(); // 回调 init 中初始化的 ChannelInitializer,开启 select 循环监听,移除 pipline 中 初始化 ChannelInitializer 并添加 ServerBootstrapAcceptor
pipeline.fireChannelRegistered();// 调用 pipline 链路中所有 handler 的 channelRegistered 方法

到这里 initAndRegister() 逻辑终于走完。

回到 AbstractBootstrap.doBind(……)

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }
    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();
                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

最后一个核心 doBind0()

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {
    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) 
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeBind(localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeBind(localAddress, promise);
            }
        }, promise, null);
    }
    return promise;
}

这里同样是一个套路,循环找到 outbound 调用其 invokeBind 方法

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        bind(localAddress, promise);
    }
}

会调用到 ChannelOutboundHandler 的 bind 方法那么这里只有一个 HeadContext ,所以只调用了 Head 的 bind 方法

final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {
  @Override
  public void bind(
          ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
      unsafe.bind(localAddress, promise);
  }
}

那么最后的底层和 Nio 代码一样会调用系统的 native 方法。

至此 Server 端的 ChannelFuture cf = bootstrap.bind(9000).sync(); 整体流程就走完了。

写在最后

NettyServer 端的逻辑到此就告一段落,Client 端来势汹汹。注册监听搞定,那连接事件如何处理呢?个人定制的 Handler 究竟何时执行,希望你准备好迎接新的挑战了。

感谢你能看到这里,希望本文对你有所帮助,希望你在阅读的过程中也可以实战跟进源码内部,自己跟进才能有更好的理解。

我是 dying 搁浅 ,我始终期待与你的相遇。无论你是否期待,潮涨潮落,我仅且就在这里…

欢迎添加个人微信 dyinggq 一起交流学习~~

我们下期再见~
在这里插入图片描述

原文链接:https://blog.csdn.net/w903328615/article/details/116948452



所属网站分类: 技术文章 > 博客

作者:以天使的名义

链接:http://www.javaheidong.com/blog/article/207176/0e1dee879480f2fa0284/

来源:java黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

30 0
收藏该文
已收藏

评论内容:(最多支持255个字符)