Netty源码分析——带你领略bootstrap其中滋味

玉米蒸排骨,一道集结了色、香、味等多方面优点的菜品,全家宴缺少它,总觉得全家宴缺少点什么东西,其作用大概可以与贯穿Netty全程的启动类所媲美。

写在之前

上一篇文章用一个 Demo 给大家介绍了 Netty 全貌,相比于写一个简单的例子,但是其中每个部分的细节尤其重要,今天带大家仔细分析 Netty 启动类,bootstrap 底层是如何实现的。

这是 Netty 深入分析的第一篇,在上一篇基础之上,带大家一起看看 Netty 源码是如何实现的,同样以分析启动类为主,会跳过一下细节,比如 EventLoopGroup底层实现、ChannelFuture 细节等,主要以启动类为切入点,介绍 Netty 初始化和启动过程。

整体的思路是先了解其脉络,再探究其细节。

Bootstrap

BootstrapNetty 提供的一个工厂类,服务器端以 ServerBootstrap 为起点,客户端用 Bootstrap 作为初始化工具,本篇以服务器端启动过程为分析点,介绍 Netty 程序是如何工作的。

服务器端入口

下面是一段服务器端启动的代码,只包括最核心内容,其余异常信息会先忽略。

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();

// 2. 创建服务端启动类,配置启动参数
ServerBootstrap serverBootstrap = new ServerBootstrap();

// 3. 配置具体的参数,配置具体参数
serverBootstrap.group(bossGroup, workGroup)
  .channel(NioServerSocketChannel.class)
  .option(ChannelOption.SO_BACKLOG, 128)
  .childOption(ChannelOption.SO_KEEPALIVE, true)
  .childHandler(new ChannelInitializer < SocketChannel > () {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
      //给 pipeline 添加处理器,每当有连接accept时,就会运行到此处。
      socketChannel.pipeline().addLast(new NettyServerHandler());
    }
  });

// 4. 绑定端口并且同步,生成了一个ChannelFuture 对象
ChannelFuture channelFuture = serverBootstrap.bind(8887).sync();

// 5. 对channel进行关闭,注意这里全部都是异步操作
channelFuture.channel().closeFuture().sync();

上面服务器端代码潦潦数行,看似简单,却展示了 Netty 启动所需要的核心过程。现在对各部分简略分析:

(1)EventLoopGroup:无论是客户端还是服务器端都必须指定,EventLoopGroup 其实是一个接口,暂且可以简单的将其看作一个线程组,实现类包括多种,这里指定了 NioEventLoopGroup,表示一个 NIO 的 EventLoopGroup

(2)指定 Channel 类型:这里使用的是 NioServerSocketChannel ,表示服务器端 Channel 类型。

(3)Handler:指定具体数据操作类型。

EventLoopGroup 初始化过程

看上面首先映入眼帘的是两个线程组的初始化,作用简单,为了初始化两个线程组。沿着源码了解一下如何进行初始化过程的,这里初始化的实际是 NioEventLoopGroup。上面说了该类其实是 EventLoopGroup 的实现类,其类结构如下:

NioEventLoopGroup类结构

NioEventLoopGroup 本身调用的是内部重载的构造器,不过其内部都是一些传参的变化,最底层调用的都是其父类MultithreadEventLoopGroup构造方法,具体构造器如下:

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object...args) {
  super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

这里同样调用的是其父类 MultithreadEventExecutorGroup 构造方法,不过这里有一点需要注意,最初我们初始化的时候并没有传入参数,这里会针对线程数额外增加处理过程。

如果我们传入的线程数 nThreads 是0, 那么 Netty 会为我们设置默认的线程数 DEFAULT_EVENT_LOOP_THREADS,这个常量是怎么确定的呢?

其实是 MultithreadEventLoopGroup 中的静态代码块中赋值,会首先从系统属性中获取 io.netty.eventLoopThreads 的值,如果我们没有设置它的话, 那么就返回默认值:** 处理器核心数 * 2**。

private static final int DEFAULT_EVENT_LOOP_THREADS;

static {
  DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
    "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

  // 额外删除日志代码
}

继续往其父类看构造过程

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
  EventExecutorChooserFactory chooserFactory, Object...args) {
  // 省略参数检查
  if (nThreads <= 0) {
    throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
  }

  if (executor == null) {
    executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
  }

  // 初始化一个线程执行器的数组
  children = new EventExecutor[nThreads];

  for (int i = 0; i < nThreads; i++) {
    boolean success = false;

    // 真正创建 EventExecutor 对象,不过 EventExecutor 是一个接口,
    // newChild()实现是具体实现类,该方法是一个抽象方法,具体构造方法是具体实现类实现的
    children[i] = newChild(executor, args);
    success = true;
    // 省略try-catch 过程

  }
  // 选择实例化选择器
  chooser = chooserFactory.newChooser(children);

  // 初始化监听器
  final FutureListener < Object > terminationListener = new FutureListener < Object > () {
    @Override
    public void operationComplete(Future < Object > future) throws Exception {
      if (terminatedChildren.incrementAndGet() == children.length) {
        terminationFuture.setSuccess(null);
      }
    }
  };

  // 遍历方式为每个 EventExecutor 添加监听器
  for (EventExecutor e: children) {
    e.terminationFuture().addListener(terminationListener);
  }

  Set < EventExecutor > childrenSet = new LinkedHashSet < EventExecutor > (children.length);
  Collections.addAll(childrenSet, children);
  readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

根据以上代码我们简单梳理一下真正初始化过程

(1)创建一个大小为 nThreads 的 EventExecutor 数组

(2)利用 for 循环通过 newChild() 方法真正创建每个 EventExecutor 对象,实际底层创建的是 NioEventLoop对象

(3)根据 nThreads 的大小,创建不同的 Chooser。即如果 nThreads 是 2 的幂, 则使用 PowerOfTwoEventExecutorChooser(), 反之使用 GenericEventExecutorChooser()。 不论使用哪个 Chooser, 它们的功能都是一样的, 即从 children 数组中选出一个合适的 EventExecutor 实例。

(4)初始化监听器

(5)通过遍历第一步数组,将监听器加到每个 EventExecutorNioEventLoop 对象中

(6)最后将第一步创建的数组元素添加到一个新的可读集合 readonlyChildren 中,至于其有何作用,暂且按下不表。

可能有人会说了,我明明是想要 new 一个 NioEventLoopGroup 对象呀,你怎么给我创建了一个 EventExecuto 数组就结束了呢。

各位先别着急,沿着 newChild() 追踪可以发现,这个方法在 NioEventLoopGroup 类中实现了,其内容很简单,底层就是实例化了一个 NioEventLoop 对象。

/**
 * 实例化一个 NioEventLoop 对象
 */
@Override
protected EventLoop newChild(Executor executor, Object...args) throws Exception {
  EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;

  return new NioEventLoop(this, executor, (SelectorProvider) args[0],
    ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}

这里更加证实了我开头所说的:EventLoopGroup是一个线程组。起底层是由多个EventExecutor组成,组最最底层实现对象由NioEventLoop组成。

三者关系如下:

NioEventLoop类结构

最后绘制一下EventLoopGroup的初始化过程吧

EventLoopGroup初始化过程

大概过程:NioEventLoopGroup—》MultithreadEventLoopGroup—》MultithreadEventExecutorGroup—》NioEventLoopGroup

思路是将整体初始化过程交给父类流程化,将真正需要初始化的对象返回到子类实例化

继续沿着代码往下看,ServerBootstrap 过程暂时啥时候都没有做,所有对象添加都交由后续的流式初始的。

NioSocketChannel 初始化过程

serverBootstrap.channel(NioServerSocketChannel.class)

在 Netty 中,Channel 是一个 Socket 的抽象,它为用户提供了关于 Socket 状态(是否是连接还是断开) 以及对 Socket 的读写等操作。每当 Netty 建立了一个连接后, 都会有一个对应的 Channel 实例。

先看看NioServerSocketChannel类结构:

04-NioServerSocketChannel类结构图

这里主要分析一下NioServerSocketChannel过程,注意这里只是告诉了 bootstrap 想要使用什么类型的 Channel,主要的类型如下:

关于 Channel 的类型 Netty 提供了多种以解决各式各样的网络传输问题,主要如下:

  • NioSocketChannel——代表异步的客户端 TCP Socket 连接.
  • NioServerSocketChannel——异步的服务器端 TCP Socket 连接.
  • NioDatagramChannel—— 异步的 UDP 连接
  • NioSctpChannel—— 异步的客户端 Sctp 连接.
  • NioSctpServerChannel—— 异步的 Sctp 服务器端连接.
  • OioSocketChannel—— 同步的客户端 TCP Socket 连接.
  • OioServerSocketChannel—— 同步的服务器端 TCP Socket 连接.
  • OioDatagramChannel—— 同步的 UDP 连接
  • OioSctpChannel—— 同步的 Sctp 服务器端连接.
  • OioSctpServerChannel—— 同步的客户端 TCP Socket 连接.

这里是通过工厂模式来初始化 Channel,该处只是创建了对应类型的工厂,初始化了一个 ReflectiveChannelFactory类型的工厂,需要实现的类为 NioServerSocketChannel

/**
 * 利用反射工厂来创建 channel 对象,默认通过无参构造器来初始化
 * @param channelClass
 * @return
 */
public B channel(Class < ? extends C > channelClass) {
  return channelFactory(new ReflectiveChannelFactory < C > (
    ObjectUtil.checkNotNull(channelClass, "channelClass")
  ));
}

但是真正的初始化过程是在后续绑定监听端口中异步完成的,但是此处顺带一起分析,核心代码如下:

final ChannelFuture initAndRegister() {
  Channel channel = null;

  // 1. 通过反射工厂(NioServerSocketChannel)获取 channel
  channel = channelFactory.newChannel();
  // 2. 初始化channel
  init(channel);

  // 此处先省略其他代码
}

分析channel对象实例化

本质上是通过反射方式调用NioServerSocketChannel()的无参构造器。我们看到, 在这个构造器中, 会调用 newSocket 来打开一个新的 Java NIO SocketChannel

public NioServerSocketChannel() {
  this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
/**
 * 打开一个JDK提供的ServerSocketChannel
 *
 * @param provider
 * @return
 */
private static ServerSocketChannel newSocket(SelectorProvider provider) {

  // 同样忽略掉try……catch

  return provider.openServerSocketChannel();

}

接着会一路调用其父类,即 AbstractNioMessageChannel 构造器,传入参数 parent 为 null; ch 为刚才使用 newSocket 创建的 Java NIO SocketChannel, 因此生成的 NioSocketChannelparent channel 是空的,事件类型为读事件。

protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
  super(parent, ch, readInterestOp);
}

继续调用其父类 AbstractNioChannel ,主要是设置 channel 类型为非阻塞。

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
  super(parent);
  this.ch = ch;
  this.readInterestOp = readInterestOp;

  // 将channel设置为非阻塞的
  ch.configureBlocking(false);

  // 此处忽略try……catch逻辑
}

再继续调用父类AbstractChannel 构造器

protected AbstractChannel(Channel parent) {
  this.parent = parent;
  // 生成唯一的channel Id
  id = newId();
  // 实例化一个unsafe对象
  unsafe = newUnsafe();
  // pipeline 是 new DefaultChannelPipeline(this) 新创建的实例,
  // 每个channel都有自己的ChannelPipeline
  pipeline = newChannelPipeline();
}

到这里, 一个完整的 NioSocketChannel 就实例化完成了,我们可以稍微总结一下构造一个 NioSocketChannel 所需要做的工作。

  1. NioServerSocketChannel 中调用 newSocket(DEFAULT_SELECTOR_PROVIDER) 打开一个新的 Java NIO SocketChannel,同时传入感兴趣事件——SelectionKey.OP_ACCEPT
  2. AbstractNioMessageChannel 没有做具体操作
  3. AbstractNioChannel 中将 channel 设置为非阻塞的
  4. AbstractChannel 中初始化 AbstractChannel 的属性,主要内容包括
  • 设置 parent 属性
  • 设置唯一的 channelId
  • 实例化一个 unsafe 对象,其底层实例化的其实是 NioMessageUnsafe 对象,这个类后续会介绍,此处可以将其看作为 channel 关联的 Jdk 本地方法。
  • 为每个 channel 分配自己的 ChannelPipeline【每个 channel 都有自己的 pipeline ,都是在channel 实例化的时候创建】

到这里 channle 实例化过程就已经结束,接下来仔细分析在这一行实例化 channel 之后,如何初始化 Channel 属性、初始化 Pipeline 等组件的呢。

ChannelFuture channelFuture = serverBootstrap.bind(8887).sync();

整体调用实例化调用类关系如下:

NioServerSocketChannel——》AbstractNioMessageChannel——》AbstractNioChannel——》AbstractChannel

ChannelPipeline初始化过程

在上一个方法中我们初始化 channel 时,最后初始化了 ChannelPipeline 对象,

关于 ChannelPIpeline 作用,此处先不做特别详细的分析,先大概讲一下它的作用:

每个创建一个 channel 都会被分配一个新的 ChannelPipeline ,这项关联是永久性的,channel、ChannelPipeline、handler、ChannelHandlerContext 关系大概如下图。(可能有人会对该关系不太理解,先不用太着急,先了解整体脉络再解剖具体细节)

ChannelPipeline

首先在 AbstractChannel 的构造器中看到了 pipeline 字段被初始化为 DefaultChannelPipeline 的实例。那么我们就来看一下,,DefaultChannelPipeline 构造器做了哪些工作吧:

protected DefaultChannelPipeline newChannelPipeline() {
  return new DefaultChannelPipeline(this);
}

这里直接调用了 DefaultChannelPipeline 的构造方法

protected DefaultChannelPipeline(Channel channel) {
  // 判空校验
  this.channel = ObjectUtil.checkNotNull(channel, "channel");
  // 初始化 future 对象
  succeededFuture = new SucceededChannelFuture(channel, null);
  voidPromise = new VoidChannelPromise(channel, true);

  // 初始化head和tail的首位 PipelineContext 对象
  tail = new TailContext(this);
  head = new HeadContext(this);
  // 组成双向链表
  head.next = tail;
  tail.prev = head;
}

传入了一个 channel, 而这个 channel 其实就是我们实例化的 NioSocketChannelDefaultChannelPipeline 会将这个 NioSocketChannel 对象保存在 channel 字段中。

DefaultChannelPipeline 中, 还有两个特殊的字段, 即 head 和 tail, 而这两个字段是一个双向链表的头和尾。 其实在 DefaultChannelPipeline 中, 维护了一个以 AbstractChannelHandlerContext 为节点的双向链表, 这个链表是 Netty 实现 Pipeline 机制的关键。

此时代码中 channel 结构为

ChannelPipeline作用

后续会有专门分析 ChannelPIpeline 章节,为了避免文章过长,此处先不展开

到这里我们已经分析完了启动类的一小部分,分别为 EventLoopGroup 初始化过程,Channel 初始化过程,ChannelPipeline 初始化过程等三个阶段。

未完待续

启动类过程未完成阶段
1、初始化好的 channel 如何注册到 EventLoopGroup
2、handler 如何添加到 ChannelPipeline
3、服务器如何绑定到本地端口均未揭晓。

这三部分内容放到 bootstrap 其中滋味下篇解析。

遗留问题解答

回答一下上面留得问题:

1、 实例化 channel 时通过 newUnsafe() 方法 new 出来的 unsafe 对象有何作用?

其实 unsafe 特别关键, 它封装了对 Java 底层 Socket 的操作, 因此实际上是沟通 Netty 上层和 Java 底层的重要的桥梁。

unsafe 接口提供的都是 Java 底层的 Socket 的操作,这也就是为什么人们老说 Netty 其实是对 JDk 的 NIO 的封装,封装之后如何调用底层就是通过 Unsafe 接口提供的方法完成。