NIO 与 AIO 模型
NIO(一般用同步非阻塞模式,如果阻塞就是 BIO 了):服务器实现模式为一个请求一个线程,客户端发送连接请求全部注册到多路复用器上,多路复用器轮询到连接有 I/O 请求时才启动一个线程进行处理(解决了连接多而 I/O 少时资源占用问题)。
AIO(异步非阻塞):服务器实现模式为一个有效请求一个线程,客户端I/O请求由OS先完成再通知服务器应用启动线程进行处理。
那么,为什么 Netty 选择 NIO 模型呢?
由于 UNIX 系统上 AIO 不成熟,底层仍然使用 EPOLL,没有很好实现 AIO,且 JDK 加了一层封装,因此实际速度并不比 NIO (epoll) 快。
Netty 整体架构为 Reactor 模型,AIO 为 Proactor 模型,整合起来复杂且冗余度高。
AIO 接收数据需要预分配内存,NIO 接收时才分配。AIO 对连接数量高但流量小的情况内存浪费大。
Netty常见应用场景
-
作为 RPC 框架的网络通信工具
分布式系统不同服务节点之间相互调用需要 RPC 框架
-
实现简单的 HTTP 服务器
-
实现即时通讯系统
-
实现消息推动系统
扩展:Java NIO
Selector
多路复用器
Selector
建立在非阻塞基础上。
Channel
被注册到Selector
上,FileChannel
不支持非阻塞channel.configureBlocking(false); // 注册 SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
第二个参数表明感兴趣事件,通过掩码形式传入:
SelectionKey.OP_READ
对应 00000001,通道中有数据可以进行读取
SelectionKey.OP_WRITE
对应 00000100,可以往通道中写入数据
SelectionKey.OP_CONNECT
对应 00001000,成功建立 TCP 连接
SelectionKey.OP_ACCEPT
对应 00010000,接受 TCP 连接
如果要监听多个事件,指定多位即可
调用
select()
方法获取通道信息,用于判断是否又感兴趣事件发生异步 IO
Java 异步 IO 提供了返回
Future
实例和使用回调函数CompletionHandler
两种方式。
扩展:Linux Epoll
Linux 下的 Epoll 实例(
epfd
通过本地方法epoll_create
创建)用文件描述符表示,程序中注册的 Socket Channel 都会放到 Selector(Epoll)内部的channel
集合中。int epoll_create(int size); // 创建epoll实例,返回文件描述符用于epoll接口后续调用
当多路复用器进行
select
,通过本地方法epollCtl
将事件注册到epfd
上进行监听。int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); // 用文件描述符epfd引用的epoll实例,对目标文件描述符fd执行op操作
本地方法
epollWait
阻塞等待读写事件发生,如果发生则由 OS 通过中断程序调用回调函数放到 Epoll 内的就绪事件列表rdlist
。因此如果rdlist
中已经有了 socket 引用,epoll_wait
返回,如果为空则阻塞进程。Epoll Select Poll 三种底层机制的差异
select 基于对所有
channel
的轮询遍历,每次调用都进行线性遍历,时间复杂度 $O(n)$ ,由于在数组上实现,最大连接有上限。poll 同样是对
channel
进行轮询,但在链表上实现,最大连接无上限。以上两种方法的共同问题是——要直到哪几个通道准备好了,需要自己进行一次遍历。
epoll 通过回调,底层用哈希表实现,每当有 IO 事件就绪,系统注册的回调函数被调用,时间复杂度 $O(1)$ 。
Netty 支持的三种 Reactor 线程模型
Reactor 单线程模型
所有 I/O 操作在同一 NIO 线程上完成,NIO 线程有以下任务:
- 作为 Server ,接收 Client 的 TCP 连接
- 作为 Client ,向 Server 发起 TCP 连接
- 读取通信对端的请求或应答消息
- 向通信对端发送消息请求或应答消息
如上图所示,套接字分离、Accept 新连接、分派请求等全部由一个线程完成,对于小容量场景可行,但高负载下一定无法支撑,会造成大量消息积压、大量超时重发请求进一步拥塞系统资源,造成节点故障。
Reactor 多线程模型
I/O 操作由一组 NIO 线程协作完成:
- 专门的 NIO 线程 —— Acceptor 用于监听 Server ,接收 Client 的 TCP 连接请求;
- 网络 I/O 操作——读、写等由一个 NIO 线程池负责,包含一个任务队列和 N 各可用线程,由这些 NIO 线程负责消息的读取、解码、编码和发送;
- 一个 NIO 线程处理多条连接,而一个连接只对应一个 NIO 线程,防止发生并发操作问题。
相比第一种模式,多线程模型已经极大减轻了 Reactor 线程的工作量,但是如果有百万数量的并发连接,而 Reactor 需要对握手信息进行安全认证,这则非常损耗性能。因此这一部分工作可以再分。
主从 Reactor 多线程模型
再将新创建 Channel 注册到线程池这一工作分离出来,交给“从 Reactor”做,主 Reactor 仅负责完成登录、握手、安全认证,一旦链路成功,将链路注册到给后端 从 Reactor 线程,由它分发到后续线程池进行 I/O 操作。通常,从 Reactor 个数可与 CPU 个数相同(实际上,如果我们在 Netty 起服务端的时候调用默认的无参构造方法 NioEventLoopGroup()
构造 workerGroup
,会起 NettyRuntime.availableProcessors() * 2
,也就是两倍 CPU 核数的线程,作为从 Reactor 线程池)。
除上述三种之外,Netty NIO 的默认模式其实是在主从 Reactor 基础上去掉线程池,Netty中的Boss类充当mainReactor,NioWorker类充当subReactor(默认 NioWorker的个数是 Runtime.getRuntime().availableProcessors()
)。在处理新来的请求时,NioWorker读完已收到的数据到 ChannelBuffer
中,之后触发 ChannelPipeline
中的 ChannelHandler
流。
Netty 是事件驱动的,可以通过 ChannelHandler
链来( ChannelPipline
)控制执行流向。因为ChannelHandler
链的执行过程在 subReactor 中是同步的,所以如果业务处理 handler 耗时长,将严重影响可支持的并发数,例如涉及数据库操作或其它阻塞交互模块时这些问题就会被放大,必须回到第三种模型上,通过线程池化解决。 Netty内置的ChannelHandler实现类–ExecutionHandler
可以满足,因为仍然是 Handler
,仍然可以加入到 Pipeline
中,对使用者来说只是添加一行代码而已。
对于 ExecutionHandler
需要的线程池模型,Netty提供了两种可选:
1) MemoryAwareThreadPoolExecutor 可控制Executor中待处理任务的上限(超过上限时,后续进来的任务将被阻塞),并可控制单个Channel待处理任务的上限;
2) OrderedMemoryAwareThreadPoolExecutor 是 MemoryAwareThreadPoolExecutor 的子类,它还可以保证同一Channel中处理的事件流的顺序性,这主要是控制事件在异步处理模式下可能出现的错误的事件顺序,但它并不保证同一Channel中的事件都在一个线程中执行(通常也没必要)。一般来说,OrderedMemoryAwareThreadPoolExecutor 是个很不错的选择,当然,如果有额外需要,也可以自行实现。
Netty 中会起多少线程
ServerBootstrap
启动时,通常 bossGroup
只需设置为 1,ServerSocketChannel
在初始化阶段也只会注册到一个 EventLoop
上,用不到多个线程。
而 IO 线程,为了充分利用 CPU,减少线程上下文切换开销,通常设置为 CPU 核数的两倍(我们知道英特尔的超线程技术,逻辑线程的个数也通常是 CPU 核数的两倍,猜测都是出于利用 CPU 性能的考虑)。
Netty 事件驱动机制
- 事件队列(event queue)接收事件入口,存储待处理事件
- 分发器(event mediator)将不同事件分发到不同业务逻辑单元
- 事件通道(event channel)为分发器与处理器之间的联系渠道
- 事件处理器(event processor)实现业务逻辑,处理完成后会发出事件,触发下一步操作
ChannelPipeline
是ChannelHandler
的集合,类似拦截器概念,Channel
作为网络操作的抽象类,是ChannelEvent
的生产者,ChannelEvent
是数据或状态的载体。
所有事件都来自 ChannelEvent
接口,涵盖监听接口、建立连接、读写数据等网络通讯各个阶段。事件处理者为 ChannelHandler
,连接处理、协议编解码、超时等机制都通过 Handler
完成。
这种响应模式就类似于 AWT 中的 Reactor Pattern。
Netty 的无锁化串行理念
其实 Redis 也是这样,我们知道并行处理可以提升并发性能,但是如果访问处理不当会带来严重的锁竞争,轻者带来部分效率损耗,重者整体性能下降。串行化设计,即消息的处理在同一线程内完成,期间不进行线程切换,避免竞争和同步锁。这对于 Redis、Netty 这样以简单高效为重的低层中间件非常有利。
不同之处在于,Redis 单机下就是单线程的,而 Netty 本身默认就是多线程并行,只是每一 NioEventLoop
中通过 ChannelPipeline
处理消息,除非配置异步 Handler
否则不进行线程处理,从性能角度看是最优的。
在实际使用中也要注意这一点,不要阻塞 EventLoop
。在耗时操作时,尽量使用 Future
,同时也尽量减少锁的使用。
Netty 核心组件
这一部分主要介绍 Netty 中核心组件的功能,引用框中会补充一些实际使用中的调优技巧.
EventLoopGroup
如图所示,两个 EventLoopGroup
实际上也就是两个线程池,Boss
仅负责接收连接,只需要一个线程,内部封装有一个Selector
,Worker
负责具体 IO 处理,每个都有绑定的Selector
。
EventloopGroup
将为每个新创建的 Channel
分配一个 EventLoop
。每个 Channel
的整个生命周期内,所有操作都由相同的 Thread
执行。
Channel
Channel
为 Netty
网络操作(读写等操作)抽象类,EventLoop
负责处理注册到其上的 Channel
的 I/O 操作,两个组件配合进行 I/O 操作。Channel
对 Java 原生的 ServerSocketChannel
和 SocketChannel
进行封装,得到了 NioServerSocketChannel
和 NioSocketChannel
,UDP 对应的是 NioDatagramChannel
。
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)......
如上图所示,channel
通过传一个 .class
对象给 Bootstrap
,很明显是工厂模式通过反射方式来创建实例,这一实例的实际实例化时机,也就是源码中 .newChannel()
调用,是在 Bootstrap
进行 bind(PORT)
(对服务端而言)和 connect(HOST, PORT)
(对客户端而言)时。
对于两个指定端点可以使用唯一
Channel
,在第一次创建后保存Channel
,对同一 IP 地址下一次通信中复用而不需要重新建立。但这样做也需要保存不同 IP 的Channel
,在初始化时可能存在一些并发问题,很多实际项目都有相应解决方法,https://mp.weixin.qq.com/s/JRsbK1Un2av9GKmJ8DK7IQ 介绍了一些解决方案。
Handler
handler
其实就是一种 AOP,负责接收到请求后的处理过程。通过 childHandler()
和 ChannelInitializer
可以指定多个 handler
组成 Pipeline
,类似拦截器概念,涉及 handler
的执行顺序。
Bootstrap
通过传入一个 ChannelInitializer
的实现类,在这个实现类中向 ChannelPipeline
中添加一系列 Handler
,这些 Handler
分别负责信息处理的某个环节。
以在 Netty 中进行 SSL 通信为例,首先加入的是
SslContext
实体提供的 Handler,用于进行加解密;然后需要加入一个DelimiterBasedFrameDecoder
,这个编解码器通过分隔符拆分解决包尺寸过大造成的 TCP 粘包等问题;之后就是数据的编解码器,如 Netty 为 String 通信提供的StringDecoder
和StringEncoder
;最后便是我们业务自己的 Handler ,用于按业务逻辑处理消息。
Future
Future<V>
接口继承自 java.util.concurrent.Future<V>
,同样用于异步调用。增加了 sync()
和 await()
用于阻塞等待,还添加了 Listeners
用于任务结束后回调。
Promise
接口继承自 Future
,实例内部是一个任务,其中的 setSuccess(V result)
和 setFailure(Throwable t)
会在执行任务的线程完成后调用。
这一回调可能是 Listeners
回调函数进行(不一定是由线程自己执行,也可能是新线程或其他线程),也可能是从 await()
中返回。
Netty 中的零拷贝
在 Bootstrap
配置参数的时候,使用 .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
来指定一个池化的 Allocator,并且使用 ByteBuf buf = allocator.directBuffer()
来获取 Bytebuf
。
PooledByteBufAllocator
会帮你复用(无需 release
,除非你后面还需要用到同一个 bytebuf
)而不是每次都重新分配 ByteBuf
。在IO操作中,Netty 接收和发送 ByteBuffer 采用直接内存进行Socket读写而不是JVM的堆空间,避免了在发送数据时,从JVM到直接内存的拷贝过程(文件传输采用transferTo
,直接将缓冲区数据发到Channel,不存在循环write方式涉及的内存拷贝),这也就是 Zero Copy 的含义。Java NIO 中也有 Zero Copy Buffer 技术。
同时,Netty 的组合Buffer对象可以聚合多个 ByteBuffer
对象,方便操作,避免通过内存拷贝将小Buffer合并成大的。