大家好,欢迎来到IT知识分享网。
有情怀,有干货,微信搜索【三太子敖丙】关注这个不一样的程序员。
本文 GitHub github.com/JavaFamily 已收录,有一线大厂面试完整考点、资料以及我的系列文章。
高能预警,本文是我一个月前就开始写的,所以内容会非常长,当然也非常硬核,dubbo源码系列结束之后我就想着写一下netty系列的,但是netty的源码概念又非常多,所以才写到了现在。
我相信90%的读者都不会一口气看完的,因为实在太长了,长到我现在顶配的mbp打字编辑框都是卡的,但是我希望大家日后想看netty或者在面试前需要了解的朋友回头翻一下就够了,那我写这个文章的意义也就有了。
也不多BB,直接开整。
NIO 基本概念
阻塞(Block)与非阻塞(Non-Block)
阻塞和非阻塞是进程在访问数据的时候,数据是否准备就绪的一种处理方式,当数据没有准备的时候。
阻塞:往往需要等待缓冲区中的数据准备好过后才处理其他的事情,否则一直等待在那里。
非阻塞:当我们的进程访问我们的数据缓冲区的时候,如果数据没有准备好则直接返回,不会等待。如果数据已经准备好,也直接返回。
阻塞 IO :
非阻塞 IO :
同步(Synchronous)与异步(Asynchronous)
同步和异步都是基于应用程序和操作系统处理 IO 事件所采用的方式。比如
**同步:**是应用程序要直接参与 IO 读写的操作。
**异步:**所有的 IO 读写交给操作系统去处理,应用程序只需要等待通知。
同步方式在处理 IO 事件的时候,必须阻塞在某个方法上面等待我们的 IO 事件完成(阻塞 IO 事件或者通过轮询 IO事件的方式),对于异步来说,所有的 IO 读写都交给了操作系统。这个时候,我们可以去做其他的事情,并不需要去完成真正的 IO 操作,当操作完成 IO 后,会给我们的应用程序一个通知。
所以异步相比较于同步带来的直接好处就是在我们处理IO数据的时候,异步的方式我们可以把这部分等待所消耗的资源用于处理其他事务,提升我们服务自身的性能。
同步 IO :
异步 IO :
Java BIO与NIO对比
BIO(传统IO):
BIO是一个同步并阻塞的IO模式,传统的 java.io 包,它基于流模型实现,提供了我们最熟知的一些 IO 功能,比如File抽象、输入输出流等。交互方式是同步、阻塞的方式,也就是说,在读取输入流或者写入输出流时,在读、写动作完成之前,线程会一直阻塞在那里,它们之间的调用是可靠的线性顺序。
NIO(Non-blocking/New I/O)
NIO 是一种同步非阻塞的 I/O 模型,于 Java 1.4 中引入,对应 java.nio 包,提供了 Channel , Selector,Buffer 等抽象。NIO 中的 N 可以理解为 Non-blocking,不单纯是 New。它支持面向缓冲的,基于通道的 I/O 操作方法。 NIO 提供了与传统 BIO 模型中的 Socket
和 ServerSocket
相对应的 SocketChannel
和 ServerSocketChannel
两种不同的套接字通道实现,两种通道都支持阻塞和非阻塞两种模式。对于高负载、高并发的(网络)应用,应使用 NIO 的非阻塞模式来开发
BIO与NIO的对比
IO模型 | BIO | NIO |
---|---|---|
通信 | 面向流 | 面向缓冲 |
处理 | 阻塞 IO | 非阻塞 IO |
触发 | 无 | 选择器 |
NIO 的 Server 通信的简单模型:
BIO 的 Server 通信的简单模型:
NIO的特点:
- 一个线程可以处理多个通道,减少线程创建数量;
- 读写非阻塞,节约资源:没有可读/可写数据时,不会发生阻塞导致线程资源的浪费
Reactor 模型
单线程的 Reactor 模型
多线程的 Reactor 模型
多线程主从 Reactor 模型
Netty 基础概念
Netty 简介
Netty 是一个 NIO 客户端服务器框架,可快速轻松地开发网络应用程序,例如协议服务器和客户端。它极大地简化和简化了网络编程,例如 TCP 和 UDP 套接字服务器。
“快速简便”并不意味着最终的应用程序将遭受可维护性或性能问题的困扰。Netty 经过精心设计,结合了许多协议(例如FTP,SMTP,HTTP 以及各种基于二进制和文本的旧式协议)的实施经验。结果,Netty 成功地找到了一种无需妥协即可轻松实现开发,性能,稳定性和灵活性的方法。
Netty 执行流程
Netty 核心组件
Channel
Channel是 Java NIO 的一个基本构造。可以看作是传入或传出数据的载体。因此,它可以被打开或关闭,连接或者断开连接。
EventLoop 与 EventLoopGroup
EventLoop 定义了Netty的核心抽象,用来处理连接的生命周期中所发生的事件,在内部,将会为每个Channel分配一个EventLoop。
EventLoopGroup 是一个 EventLoop 池,包含很多的 EventLoop。
Netty 为每个 Channel 分配了一个 EventLoop,用于处理用户连接请求、对用户请求的处理等所有事件。EventLoop 本身只是一个线程驱动,在其生命周期内只会绑定一个线程,让该线程处理一个 Channel 的所有 IO 事件。
一个 Channel 一旦与一个 EventLoop 相绑定,那么在 Channel 的整个生命周期内是不能改变的。一个 EventLoop 可以与多个 Channel 绑定。即 Channel 与 EventLoop 的关系是 n:1,而 EventLoop 与线程的关系是 1:1。
ServerBootstrap 与 Bootstrap
Bootstarp 和 ServerBootstrap 被称为引导类,指对应用程序进行配置,并使他运行起来的过程。Netty处理引导的方式是使你的应用程序和网络层相隔离。
Bootstrap 是客户端的引导类,Bootstrap 在调用 bind()(连接UDP)和 connect()(连接TCP)方法时,会新创建一个 Channel,仅创建一个单独的、没有父 Channel 的 Channel 来实现所有的网络交换。
ServerBootstrap 是服务端的引导类,ServerBootstarp 在调用 bind() 方法时会创建一个 ServerChannel 来接受来自客户端的连接,并且该 ServerChannel 管理了多个子 Channel 用于同客户端之间的通信。
ChannelHandler 与 ChannelPipeline
ChannelHandler 是对 Channel 中数据的处理器,这些处理器可以是系统本身定义好的编解码器,也可以是用户自定义的。这些处理器会被统一添加到一个 ChannelPipeline 的对象中,然后按照添加的顺序对 Channel 中的数据进行依次处理。
ChannelFuture
Netty 中所有的 I/O 操作都是异步的,即操作不会立即得到返回结果,所以 Netty 中定义了一个 ChannelFuture 对象作为这个异步操作的“代言人”,表示异步操作本身。如果想获取到该异步操作的返回值,可以通过该异步操作对象的addListener() 方法为该异步操作添加监 NIO 网络编程框架 Netty 听器,为其注册回调:当结果出来后马上调用执行。
Netty 的异步编程模型都是建立在 Future 与回调概念之上的。
Netty 源码阅读
源码阅读,最好可以再 Debug 的情况下进行,这样更容易帮助理解,因此在分析 Netty 前的我准备一个客户端和服务端的代码。
Netty – Server 代码
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new SomeSocketServerHandler());
}
});
ChannelFuture future = bootstrap.bind(8888).sync();
System.out.println("服务器已启动。。。");
future.channel().closeFuture().sync();
} finally {
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
}
IT知识分享网
Server 端 Handler:
IT知识分享网public class DemoSocketServerHandler
extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("Client Address ====== " + ctx.channel().remoteAddress());
ctx.channel().writeAndFlush("from server:" + UUID.randomUUID());
ctx.fireChannelActive();
TimeUnit.MILLISECONDS.sleep(500);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
Netty – Client 代码
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new DemoSocketClientHandler());
}
});
ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
future.channel().closeFuture().sync();
} finally {
if(eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully();
}
}
}
}
Client 端 Handler :
IT知识分享网public class DemoSocketClientHandler
extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println(msg);
ctx.channel().writeAndFlush("from client: " + System.currentTimeMillis());
TimeUnit.MILLISECONDS.sleep(5000);
}
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
ctx.channel().writeAndFlush("from client:begin talking");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
NioEventLoopGroup 初始化分析
首先根据 Server 服务端代码,分析 NioEventLoopGroup 的初始化过程。而在分析 NioEventLoopGroup 之前,有必要简单的说一说 NioEventLoopGroup 与 NioEventLoop ,方便后续源码的理解。
NioEventLoop 源码分析前了解
NioEventLoop 的继承体系
从 NioEventLoop 的继承体系中可以看到,NioEventLoop 本身就是一个 Executor,并且还是一个 单线程的 Executor。Executor 必然拥有一个 execute(Runnable command)
的实现方法,而 NioEventLoop 的 execute()
实现方法在其父类 SingleThreadEventExecutor 中,找到具体代码:
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
这里不细说,但是贴出这段代码主要为了引出 startThread();
这句代码,在跟这句代码会发现,它最终调用了 NioEventLoop 的一个成员 Executor 执行了当前成员的 execute()
方法。对应的成员 io.netty.util.concurrent.SingleThreadEventExecutor#executor
而 executor 成员的初始化也是在当前代码执行时创建的匿名 Executor ,也就是执行到即新建并且执行当前 匿名 executr()
方法。
总结:
- NioEventLoop 本身就是一个 Executor。
- NioEventLoop 内部封装这一个新的线程 Executor 成员。
- NioEventLoop 有两个
execute
方法,除了本身的execute()
方法对应的还有成员属性 Executor 对应的execute()
方法。
备注: 因为这里出现了四个 Executor,为了区分,我们给其新的名称:
NioEventLoop 本身 Executor:NioEventLoop
NioEventLoop 的成员 Executor:子 Executor
NioEventLoopGroup 本身 Executor :NioEventLoopGroup
NioEventLoopGroup 的构造参数 Executor :总Executor
NioEventLoopGroup 的继承体系
看到继承体系可以直接知道 NioEventLoopGroup 也是一个 Executor,并且是一个线程池的 Executor,所以他也有 execute()
方法。对应的实现再其父类之中:io.netty.util.concurrent.AbstractEventExecutorGroup#execute
而这里还需要说到的一点是:在 NioEventLoopGroup 的构造中,再其父类 MultithreadEventExecutorGroup 的构造再次引入了一个新的 Executor,
之所以这里提到这个 Executor,是因为这个 Executor 是对应的 execute()
就是在 NioEventLoop 中的成员 Executor 的 execute()
执行时调用的。也就是下面对应的代码调用。io.netty.util.internal.ThreadExecutorMap#apply(java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutor)
到这如果不明白,没关系,因为只是为了引入 NioEventLoopGroup 和 NioEventLoop 的对应的两个 Executor,和两个 Executor 对应的两个 execute()
方法。这个后面还会有详细分析。
总结:
- NioEventLoopGroup 是一个线程池线程 Executor。
- NioEventLoopGroup 也封装了一个线程 Executor。
- NioEventLoopGroup 也有两个
execute()
方法。
NioEventLoopGroup 初始化代码分析
上面说了基本的了解内容,下面具体分析,从 NioEventLoopGroup 的初始化进入源码分析。
入口我们直接找 NioEventLoopGroup 的无参构造。
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
// 第二个参数是这个group所包含的executor
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
// 第三个参数是provider,其用于提供selector及selectable的channel,
// 这个provider是当前JVM中唯一的一个单例的provider
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider) {
// 第四个参数是一个选择策略工厂实例
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
// 第三个参数是选择器工厂实例
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
跟到此,可以发现无参构造的基本参数被初始化, nThreads :DEFAULT_EVENT_LOOP_THREADS//默认当前CPU逻辑核心数的两倍
,selectorProvide:SelectorProvider.provider()//当前JVM中唯一的一个单例的provider
,SelectStrategyFactory:DefaultSelectStrategyFactory.INSTANCE//默认选择策略工厂实例
,chooserFactory:DefaultEventExecutorChooserFactory.INSTANCE//选择器工厂实例
。到这里只是基本的初始化参数,重点方法为MultithreadEventExecutorGroup
的构造方法。下面重点分析:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
// 这个executor是group所包含的executor,其将来会为其所包含的每个eventLoop创建一个线程
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 创建eventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 在创建这些eventLoop过程中,只要有一个创建失败,则关闭之前所有已经创建好的eventLoop
if (!success) {
// 关闭之前所有已经创建好的eventLoop
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
// 终止所有eventLoop上所执行的任务
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 创建一个选择器
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
根据无参构造直接往下跟,可以看到核心部分在最后一个父类的构造里。也就是 io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)
。
再这里完成整个 NioEventLoopGroup 的实例初始化,这里分析下,然后再画个图回顾下。
初始化构造参数中的 Executor 参数,当其为空时,将其初始化
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
首先 newDefaultThreadFactory())
创建默认的线程工厂,有兴趣可以跟进去看看。然后再创建ThreadPerTaskExecutor
线程 Executor 对象。(PS:这里创建的 Executor 就是 NioEventLoopGroup 内的 Executor 对象,并不是当前 NioEventLoopGroup 自身,可以称其为 总 Executor)。
然后可以看到这里创建了一个 children 数组,根据需要创建的线程数创建对应数量的数组。
children = new EventExecutor[nThreads];
因为每个 NioEventLoopGroup 都是 NioEventLoop 的集合,所以这里的 children 数组就是当前 NioEventLoopGroup 的 NioEventLoop。所以 NioEventLoop 的创建的实在 NioEventLoopGroup 初始化的时候。下面看 NioEventLoop 的初始化:
// 逐个创建nioEventLoop实例
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 创建eventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 在创建这些eventLoop过程中,只要有一个创建失败,则关闭之前所有已经创建好的eventLoop
if (!success) {
// 闭之前所有已经创建好的eventLoop
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
// 终止所有eventLoop上所执行的任务
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
先整体看这段 NioEventLoop 的创建代码,可以看到整个过程中存在一个成功标志,catch 每个 NioEventLoop 创建完成过程,如果发生异常则将所有已经创建的 NioEventLoop 关闭。重点的代码也就在 NioEventLoop 的创建了。所以我们继续跟:children[i] = newChild(executor, args);
往下走,直接找到 io.netty.channel.nio.NioEventLoopGroup#newChild
,因为当前是 NioEventLoopGroup 的创建,所以知道找到子类的 newChild
实现。
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
又将之前合并的 args 参数强转回来,继续跟进 NioEventLoop 构造:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
// 创建一个selector的二元组
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
这里我们先整体看下,将之前的默认参数初始化到 NioEventLoop 属性中。其中有两处:openSelector()
和 super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler)
。这里先看父类构造:
往下跟,直接就是 SingleThreadEventLoop -> SingleThreadEventExecutor 的初始化,这些也可以在 NioEventLoop 的继承体系可以看到:
// io.netty.channel.SingleThreadEventLoop#SingleThreadEventLoop
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
// 创建一个收尾队列
tailTasks = newTaskQueue(maxPendingTasks);
}
// io.netty.util.concurrent.SingleThreadEventExecutor#SingleThreadEventExecutor
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
// 这是当前NioEventLoop所包含的executor
this.executor = ThreadExecutorMap.apply(executor, this);
// 创建一个任务队列
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
这里首先创建的是 SingleThreadEventExecutor ,这里重点需要关注的代码是:
this.executor = ThreadExecutorMap.apply(executor, this);
这里this
是 NioEventLoop ,所以this.executor
就是前面说的 NioEventLoop 里的 Executor,这里我们先称为 子 Executor(子:对应的就是 NioEventLoop ,前面说的 总:对应的是 NioEventLoopGroup )。
而这里 子 Executor 的初始化是由一个 executor
参数的,这个就是前面 NioEventLoopGroup 构造方法一直带入的 总 Executor。那我们继续往下跟,看看这个子 Executor 是如何完成的初始化的。
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(executor, "executor");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
// 这里创建的executor是子executor
return new Executor() {
// 这个execute()是子executor的execute()
@Override
public void execute(final Runnable command) {
// 这里调用了NioEventLoopGroup所包含的executor的execute()
// 即调用了“总的executor”的execute()
executor.execute(apply(command, eventExecutor));
}
};
}
这段代码细看就会明白,这里创建的 子 Executor的创建也就是一个线程的创建,但是重点却在这个线程 Executor 的 execute()
方法实现,只做了一件事情:就是调用 传入的 总 Executor 的 execute()
方法。所以这里 子 Executor 做的事情就是调用 总 Executor 的 execute()
。不要觉得这里绕,因为这还只是初始化,后面这里执行会更绕。[手动捂脸哭]
其实这里的 apply(command, eventExecutor)
,这里再执行 总 Executor 的 execute()
时还是会记录当前正在执行的线程,并且再执行完成时将当前记录值删除。
public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Runnable() {
@Override
public void run() {
setCurrentEventExecutor(eventExecutor);
try {
command.run();
} finally {
setCurrentEventExecutor(null);
}
}
};
}
这里再 NioEventLoop 的属性 Executor 创建完成时,又去创建了一个普通任务队列taskQueue = newTaskQueue(this.maxPendingTasks);
并且还创建了一个收尾任务队列tailTasks = newTaskQueue(maxPendingTasks);
。这几个队列后面会说到。这里继续跟 NioEventLoop 主流程初始化。
到这我们再回去看看 openSelector()
,这里我们要先知道 SelectorTuple :
private static final class SelectorTuple {
final Selector unwrappedSelector; // NIO原生selector
final Selector selector; // 优化过的selector
SelectorTuple(Selector unwrappedSelector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = unwrappedSelector;
}
SelectorTuple(Selector unwrappedSelector, Selector selector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = selector;
}
}
SelectorTuple 只是一个包含两个 Selector 的内部类,用于封装优化前后的 Selector。而 openSelector()
方法就是为了返回 Selector 并且根据配置判断是否需要优化当前 Selector 。下面看具体代码:
而具体的优化过程有兴趣的可以自己去看看,这里只要知道,若是禁用了优化则 SelectorTuple 的优化后的 Selector 和为优化的 Selector 均为 Nio 原生的 Selector。
而这io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)
后面还有在 NioEventLoop 数组创建完成后,还有选择器创建和关闭监听器绑定等,感兴趣可以自己看看,这里不再介绍。
到这一个 NioEventLoop 的创建过程的代码也全部看完了。我想如果只看这个肯定还是有点懵,源码这个东西需要自己跟进去去看,debug 一点点的跟,跟着运行的代码去想为何这么实现,不过这里我也画个图,让大家更直观的了解到 NioEventLoopGroup 的创建流程以及主要操作。
我想大家结合这个图,再结合上面的分析过程,最好可以自己找到源码,跟一遍,应该可以理解 NioEvnetLoopGroup 的创建。
ServerBootstrap与 ServerBootstrap 属性配置分析
继承体系:
入口代码:
//2.创建服务端启动引导/辅助类:ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
//3.给引导类配置两大线程组,确定了线程模型
b.group(bossGroup, workerGroup)
// (非必备)打印日志
.handler(new LoggingHandler(LogLevel.INFO))
// 4.指定 IO 模型
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
//5.可以自定义客户端消息的业务处理逻辑
p.addLast(new HelloServerHandler());
}
});
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new SomeSocketClientHandler());
}
});
ServerBootstrap与 Bootstrap 都是启动配置类,唯一不同的是,ServerBootstrap是服务端的启动配置类,Bootstrap 则是客户端的启动配置类,主要用于绑定我们创建的 EventLoopGroup,指定 Channel 的类型以及绑定 Channel 处理器等操作,主要做的都是给 ServerBootstrap与 Bootstrap 的属性赋值操作,所以称其为配置类。可以进入 group()
方法里看一眼:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
其他的方法也是一样,感兴趣可以自己进去看看。这里只是初始化,都是为了后面的操作做准备。
服务端 bind 方法 ServerBootstrap.bind() 源码解析
这里我们从这里进入:
b.bind(port).sync();
直接从 bind()
方法跟进去:
// io.netty.bootstrap.AbstractBootstrap#bind(int)
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
// 继续跟进
public ChannelFuture bind(SocketAddress localAddress) {
// 验证group与channelFactory是否为null
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
// 这里是一处重点逻辑
return doBind(localAddress);
}
这里显示校验了 Bootstrap 的 group 与 channelFactory 是否绑定成功。然后继续跟进 doBind()
方法:
private ChannelFuture doBind(final SocketAddress localAddress) {
// 创建、初始化channel,并将其注册到selector,返回一个异步结果
final ChannelFuture regFuture = initAndRegister();
// 从异步结果中获取channel
final Channel channel = regFuture.channel();
// 若异步操作执行过程中出现了异常,则直接返回异步对象(直接结束)
if (regFuture.cause() != null) {
return regFuture;
}
// 处理异步操作完成的情况(可能是正常结束,或发生异常,或任务取消,这些情况都属于有结果的情况)
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
// 绑定指定的端口
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else { // 处理异步操作尚未有结果的情况
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
// 为异步操作添加监听
regFuture.addListener(new ChannelFutureListener() {
// 若异步操作具有了结果(即完成),则触发该方法的执行
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) { // 异步操作执行过程中出现了问题
promise.setFailure(cause);
} else { // 异步操作正常结果
promise.registered();
// 绑定指定的端口
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
首先再这里,我们先把这个方法整体的逻辑搞清楚,然后再再去研究他的每一步具体的操作,画个图,先理解这个方法做了什么:
可以在图中结合代码,找到整个 dobind()
的大局处理思路,然后呢,到这里我们还有很多重点细节需要继续跟进,也就是图中标记的 Tag 1、Tag 2。为了方便后面跟进去代码之后方便回来,这里以此标记,然后下面在具体分析 Tag 标记的源码:
补充 Tag 0 :
ChannelPromise 与 ChannelFuture 了解。
Tag 1 :
异步创建、初始化channel,并将其注册到selector
final ChannelFuture regFuture = initAndRegister();
Tag 2 :
绑定指定的端口号:
doBind0(regFuture, channel, localAddress, promise);
补充 Tag 0:ChannelPromise 与 ChannelFuture
ChannelPromise 是一个特殊的 ChannelFuture,是一个可修改的 ChannelFuture。内部提供了修改当前 Future 状态的方法。在 ChannelFuture 的基础上实现了设置最终状态的修改方法。
而 ChannelFuture 只可以查询当前异步操作的结果,不可以修改当前异步结果的 Future 。这里需要知道的就是 ChannelPromise 可以修改当前异步结果的状态,并且在修改状态是会触发监听器。在 doBind
方法中主要用于在处理异步执行一直未结束的的操作,将异步结果存在异常的时,将异常赋值给 ChannelPromise 并返回。
Tag 1 : initAndRegister() 初始化并注册 Channel
先找到代码:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 创建channel
channel = channelFactory.newChannel();
// 初始化channel
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 将channel注册到selector
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
嗯?!代码意一看,咋就这么点,也就做了三件事,可是这三件事做的每一个都不是一句代码的可以完成的。这里我们一个一个分析,除了这三件事情,其他的也就是异常后的处理逻辑,所以主流程就是下面的三句代码,也为了跟进继续打上标记吧:
Tag 1.1 创建channel channel = channelFactory.newChannel();
Tag 1.2 初始化channel init(channel);
Tag 1.3 将channel注册到selector ChannelFuture regFuture = config().group().register(channel);
针对这三处,还是要一处一处分析。
Tag 1.1 channelFactory.newChannel() 创建 Channel
找到对应的代码:io.netty.channel.ReflectiveChannelFactory#newChannel
@Override
public T newChannel() {
try {
// 调用无参构造器创建channel
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
这里为什么直接找到 ReflectiveChannelFactory ,需要提一下,在分析 ServerBootstrap与 Bootstrap 启动配置类的时候,设置 channel 的方法,跟进去可以找到针对属性 channelFactory 的赋值代码:
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
可以看到这里 new 的就是 ReflectiveChannelFactory 工厂类,然后再看 ReflectiveChannelFactory 的构造:
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
// 将NioServerSocketChannel的无参构造器初始化到constructor
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
看到的是 ReflectiveChannelFactory 在创建时初始化了 constructor 属性,将传入的 channel 类 clazz 中获取构造赋值给了 ReflectiveChannelFactory 反射工厂的 constructor 属性。
而我们再 Server 端传入的 channel 类为NioServerSocketChannel.class
,所以上面看的 constructor.newInstance();
对应的也就是 NioServerSocketChannel 的无参构造。这样我们就继续跟进 NioServerSocketChannel :
// NIO中的provider,其用于创建selector与channel。并且是单例的
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel() {
// DEFAULT_SELECTOR_PROVIDER 静态变量
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
继续跟进 newSocket()
:
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
// 创建NIO原生的channel => ServerSocketChannel
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
就是返回了一个 Java NIO 原生的 Channel,最后将 NIO 原生的Channel 包装成 NioServerSocketChannel,继续跟进 this(newSocket(DEFAULT_SELECTOR_PROVIDER))
找到有参构造具体代码:
public NioServerSocketChannel(ServerSocketChannel channel) {
// 参数1:父channel
// 参数2:NIO原生channel
// 参数3:指定当前channel所关注的事件为 接受连接
super(null, channel, SelectionKey.OP_ACCEPT);
// 用于对channel进行配置的属性集合
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
这里主要做了两件事情,1. 调用父类构造,2. 对 channel 进行配置属性集合。
这里先说下 new NioServerSocketChannelConfig(),这部操作就是给当前 Channel 的 config 进行赋值,用来保存当前 Channel 的属性配置的集合。好了,这个说了我们继续跟主线:super(null, channel, SelectionKey.OP_ACCEPT)
// io.netty.channel.nio.AbstractNioMessageChannel#AbstractNioMessageChannel
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
// io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
// 这里的this.ch为NIO原生channel
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
// NIO,非阻塞
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
直接找到 AbstractNioChannel 父类构造,这也第一步也是调用父类构造 super(parent);
先记着,先看除了调用父类构造还做了什么事情:
- 调用父类构造 super(parent);
- 将前面创建的原生 Channel 复制给属性保存 this.ch = ch;
- 当前 channel 的关注事件属性赋值 this.readInterestOp = readInterestOp; // SelectionKey.OP_ACCEPT 接受事件
- 将 NIO 原生 Channel 设置为非阻塞 ch.configureBlocking(false);
在 AbstractNioChannel 构造中就做了这么四件事情,主要需要说的还是其调用父类构造又做了什么事情,找到代码:
// io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel)
protected AbstractChannel(Channel parent) {
this.parent = parent;
// 为channel生成id,由五部分构成
id = newId();
// 生成一个底层操作对象unsafe
unsafe = newUnsafe();
// 创建与这个channel相绑定的channelPipeline
pipeline = newChannelPipeline();
}
在 AbstractChannel 构造中主要做了三件事:
- 为当前 Channel 生成 id
newId()
,感兴趣可以跟进去看看。- 生成一个底层操作对象 unsafe,用于 I/O 线程调用传输时使用,用户代码无法调用。
newUnsafe()
- 创建与这个channel相绑定的channelPipeline,这也是一个重点操作,不过在这里先不展开细说,后面会单独细跟 channelPipeline 的代码。
所以到此 **Tag 1 : initAndRegister() ** 中的 **Tag 1.1 newChannel() ** 创建 Channel 才算跟完。针对 Tag 1.1 newChannel() 我们也画图简图整理下思路:
根据图,在结合上面代码的分析,最好自己再可以跟一遍代码,我想这一块的理解还是没什么问题的。到这也只是创建了 Channel。Tag 1.1 的 Channel 创建结束,接着跟进 Tag 1.2 init(channel).
Tag 1.2 init(channel) 初始化 Channel
这里我们是从 ServerBootstrap 中的doBind 进入的,所以这里直接找到 io.netty.bootstrap.ServerBootstrap#init
void init(Channel channel) throws Exception {
// 获取serverBootstrap中的options属性
final Map<ChannelOption<?>, Object> options = options0();
// 将options属性设置到channel
synchronized (options) {
setChannelOptions(channel, options, logger);
}
// 获取serverBootstrap中的attrs属性
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
// 遍历attrs属性
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
// 将当前遍历的attr初始化到channel
channel.attr(key).set(e.getValue());
}
}
// 获取channel的pipeline
ChannelPipeline p = channel.pipeline();
// 将serverBootstrap中所有以child开头的属性写入到局部变量,
// 然后将它们初始化到childChannel中
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 将ServerBootstrapAcceptor处理器添加到pipeline
// ServerBootstrapAcceptor处理器用于接收ServerBootstrap中的属性值,
// 我们通常称其为连接处理器
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
这里做的事情还是很多的,基本操作我在上面注释上也标注出来,还有一些需要继续跟下去的主要操作,还是先标记 Tag 然后继续跟下去。这里说一下这里的 options 与 attrs 属性的赋值,其实就是讲我们 ServerBootstrap 与 Bootstrap 在调用 doBind()
之前通过 option()
与 attr()
设置的参数值,其中 options 属性设置到了 Channel 的 config 属性中,attrs 是直接被设置在了 Channel 上的。
在设置完 options 属性与 attrs 属性时,接着获取了当前 channel 的 pipeline,接下来还是获取我们在 doBind()
之前设置的属性值,以 child 开头的方法 childOption()
与 childAttr()
设置的属性值。
这里使用局部变量记录了所有 Child 相关的值 currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs
主要用于初始化 childChannel 的属性,new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs))
主要是创建 连接处理器。
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 将ServerBootstrapAcceptor处理器添加到pipeline
// ServerBootstrapAcceptor处理器用于接收ServerBootstrap中的属性值,
// 我们通常称其为连接处理器
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
首先这里想做的事情是:将当前 channel 的 pipeline 中绑定一个初始化处理器 ChannelInitializer ,因为是抽象类,所以需要匿名实现 initChannel方法。 而这些主要的操作是处理 childGroup 里面的 channel 的初始化操作。这里我只想主要讲一下这个连接处理器 ServerBootstrapAcceptor 主要做了什么,其他的具体会在后面的 handler 和 pipeline 的时候细说。
**补充:**这里因为 ServerBootstrap 服务端是对用的有两个 EventLoopGroup,在服务端,parentGroup 是用于接收客户端的连接,在 parentGroup 接收到连接之后是将只是将当前转给了 childGroup去处理后续操作,而 childGroup 是用来专门处理连接后的操作的,不关心 channel 的连接任务。这个其实就是 Netty-Server 的 Reactor 线程池模型的处理逻辑。
这里主要往下说一下这个连接处理器: ServerBootstrapAcceptor 。
ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
// See https://github.com/netty/netty/issues/1328
enableAutoReadTask = new Runnable() {
@Override
public void run() {
channel.config().setAutoRead(true);
}
};
}
ServerBootstrapAcceptor 构造只是将 ServerBootstrap 中配置的 Child 属性设置保存下来。而这里一直说这是连接处理器,是因为当客户端连接发送到服务端时,这个处理器会接收客户端的连接并处理。主要是处理方法是 channelRead 中的实现:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// msg为客户端发送来的数据,其为NioSocketChannel,即子channel,childChannel
final Channel child = (Channel) msg;
// 将来自于ServerBootstrap的child开头属性初始化到childChannel中(childHandler、childOptions、childAttrs)
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 将childChannel注册到selector 需要注意的是,这里的selector与父channel所注册的selector不是同一个
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
这里主要就做了两件事情:
- 初始化 childChannel
- 将成功从 client 连接过来的 channel 注册到 selector 上。
这里一直说子channel,就是因为这里注册的是两个 EventLoopGroup,在 Server 端的处理上 netty 线程模型采用“服务端监听线程”和“IO线程”分离的方式。所以这里 channelRead
方法就是在 client 端请求连接到 server 端时,用于将当前连接的 IO 线程绑定到 childChannel 同时注册到 ChildGroup 中的 Selector 中。线程,模型可以参考下面的图:
好了,到这里 **Tag 1.2 initChannel ** 代码也分析完了,有些关于 pipeline 、handler、selector 的部分没有细说因为后面会单独说,在这里没有直接展开。
这里也画个图:到时候将这些图在整合到一起,现在是的分析过程就像是化整为零,最后在整合到一起化零为整。
这里除了 init(channel) 方法之外,还主要说了下 ServerBootstrapAcceptor 连接处理器。其实主要是 netty-server 的线程模型与代码的结合理解。
本文太长了,导致会超出大部分博客网站的字数限制,所以我分上下发了
我是敖丙,你知道的越多,你不知道的越多,感谢各位人才的:点赞、收藏和评论,我们下期见!
文章持续更新,可以微信搜一搜「 三太子敖丙 」第一时间阅读,回复【资料】有我准备的一线大厂面试资料和简历模板,本文 GitHub github.com/JavaFamily 已经收录,有大厂面试完整考点,欢迎Star。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/13242.html