本篇主要介绍Java NIO的基本原理和主要组件
Netty是由JBOSS提供的Java开源网络应用程序框架,其底层是基于Java提供的NIO能力实现的。因此为了掌握Netty的底层原理,需要首先了解Java NIO的原理。
计算机主要由CPU、内存、外存、IO设备等硬件组成,计算机执行计算的过程就是CPU从内存中获取数据,进行计算,然后再将计算结果写入内存中。但由于内存非常昂贵且下电后数据会丢失,计算机需要使用外存来持久化存储大规模的数据,外存提供了大量的存储空间,代价是其存取速度远小于内存。除了读取外存数据,计算机还可以从网络设备获取网络中的数据,受网络传输速度的限制,计算机获取网络数据的速度也远小于其读取内存的速度。
对于存取这些低速IO设备,操作系统(如Linux)提供了5种不同的IO模型。对于Java来说,其最先提供的就是基于最简单的阻塞式IO模型实现的BIO(Blocking IO)库。当调用BIO库读取硬盘中的数据时,用户进程会一直被阻塞在读数据的接口上,直到数据被操作系统从硬盘中获取出来并返回给用户,这时用户进程才能继续向下执行。由于读取硬盘速度相比CPU计算速度慢很多,进程就会一直被卡在读取数据这里,用户体验就是进程没有响应。即使CPU处于空闲状态,也无法使用CPU进行其他工作。这就浪费了大量的资源,同时也给用户造成了不好的体验。
除了最传统的阻塞式IO,操作系统还提供了其他几种改进的IO模型,总体思想都是尽可能减少用户进程阻塞在IO上的时间,在进行慢速设备IO时,进程无需等待,可以继续处理其他指令,当数据获取完成时操作系统再通知用户进程可以进行后续的数据处理操作。因此Java在1.4版本后就推出了一套新的IO接口NIO(New IO),这套IO接口基于多路复用IO模型,提供了非阻塞的IO能力,因此NIO中的N也可以理解为Non-blocking。这套NIO接口实现了只用一个线程来轮询等待所有应用进程的IO就绪状态,当某个应用进程的IO状态就绪时,再通知对应进程进行数据读写的操作。这就避免了每个应用进程在IO时被阻塞,为开发高性能和高并发的应用提供了关键能力。
Channel 是 NIO 的核心概念,它表示一个打开的连接,是数据读写的双向通道,这个连接可以连接到 I/O 设备(例如:磁盘文件,Socket)或者一个支持 I/O 访问的应用程序,Java NIO 使用缓冲区和通道来进行数据传输。
Channel的主要实现类有:
FileChannel只能工作在阻塞模式下,不能配合Selector
FileChannel不能直接打开,必须通过FileInputStream
,FileOutputStream
或RandomAccessFile
来获取,它们都有getChannel()
方法
FileInputStream
获取的channel只能读FileOutputStream
获取的channel只能写RandomAccessFile
是否能读写根据构造时的读写模式决定效率相比使用流式方式拷贝数据高很多,底层使用了操作系统提供的零拷贝特性。
一次最多传输2g的数据
Buffer是NIO的另一个核心概念,NIO库操作数据都是通过缓冲区处理的,在数据读写的过程都要先经过缓冲区,然后再从缓冲区中按照块来处理数据。
从类图中可以看到,7 种数据类型对应着 7 种子类,这些名字是 Heap 开头子类,数据是存放在 JVM 堆中的。
MappedByteBuffer
存放在堆外直接内存中,可以与文件进行映射。
通过java.nio包和MappedByteBuffer允许Java程序直接从内存中读取文件内容,通过将整个或部分文件映射到内存,由操作系统来处理加载请求和写入文件,应用只需要和内存打交道,这使得IO操作非常快。
Mmap内存映射和普通标准IO操作的本质区别在于它并不需要将文件中的数据先拷贝至OS的内核IO缓冲区,而是可以直接将用户进程私有地址空间中的一块区域与文件对象建立映射关系,这样程序就好像可以直接从内存中完成对文件读/写操作一样。
channel.read(buf)
flip()
切换成读模式buffer.get()
clear()
或compact()
切换为写模式@Slf4j
public class TestByteBuffer {
public static void main(String[] args) {
try (FileInputStream fileInputStream = new FileInputStream("data.txt");
FileChannel channel = fileInputStream.getChannel()) {
ByteBuffer buffer = ByteBuffer.allocate(10);
while (true) {
int len = channel.read(buffer);
if (len < 0) {
break;
}
log.info("读到的字符数:{}", len);
buffer.flip();
while (buffer.hasRemaining()) {
byte b = buffer.get();
log.info("读到的字符:{}", (char) b);
}
buffer.clear();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Java规定内码使用UTF-16编码,一个字符占用2个字节,也就是Java中的char类型在内存中是使用UTF-16的编码形式存储的。而我们代码中读取的文件data.txt使用的是UTF-8编码格式,因此对于其中的英文字符和数字,一个字符只占用一个字节。因此代码中我们可以每次读取一个字节,然后再把它转换成Java内部的char。
Buffer是由特定基本类型的元素组成的线性有限序列,除了Buffer里面的内容,其最重要的属性就是它的capacity
,limit
和position
。
capacity
:Buffer中可以储存的元素数量,Buffer的capacity
不能为负值也永远不会改变。limit
:Buffer中第一个不能被读取或写入的元素的位置,limit
不能为负值也不能大于capacity
。position
:Buffer中下一个将要被读取或写入的元素的位置,position
不能为负值也不能大于limit
。ByteBuffer.allocate(10)
的时候,我们初始化了一个HeapByteBuffer
对象,并将其capacity
和limit
均设置为10,position
被设置为0。// ByteBuffer.java
public static ByteBuffer allocate(int capacity) {
if (capacity < 0)
throw new IllegalArgumentException();
return new HeapByteBuffer(capacity, capacity);
}
// HeapByteBuffer.java,调用的HeapByteBuffer中的构造方法
HeapByteBuffer(int cap, int lim) { // package-private
super(-1, 0, lim, cap, new byte[cap], 0);
}
channel.read(buf)
向Buffer中写入数据时,根据源码分析,其最终会调入ByteBuffer
的put()
方法中,HeapByteBuffer
对其的实现如下,nextPutIndex()
方法检查当前position是否大于等于limit,如果小于limit,则将原position返回,并将原position加1。// HeapByteBuffer.java
public ByteBuffer put(byte x) {
hb[ix(nextPutIndex())] = x;
return this;
}
flip()
方法,所谓将Buffer切换为读模式,其实源码中就是将position和limit的位置重新赋值。如此操作后,position就是我们读取数据的起点,limit就是我们读取数据的终点。// Buffer.java
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
buffer.get()
方法来获取一个字节,通过源码可以看出,nextGetIndex()
方法检查当前position是否大于等于limit,如果小于limit,则将原位置返回,并将原位置加1。// HeapByteBuffer.java
public byte get() {
return hb[ix(nextGetIndex())];
}
当读取所有数据后,可以调用buffer.clear()
方法或buffer.compact()
方法将Buffer切换为写模式。
clear()
: 直接将Buffer重置为初始状态,忽略还没有读完的数据。compact()
:将还没读完的数据复制到缓冲区头部,然后从没读完的数据后可以开始写入新的数据// Buffer.java
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
// HeapByteBuffer.java
public ByteBuffer compact() {
System.arraycopy(hb, ix(position()), hb, ix(0), remaining()); // 将还没读完的数据拷贝到数组头部
position(remaining()); // 将position重置为剩余待读数据之后
limit(capacity()); // 将limit重置为capacity
discardMark();
return this;
}
mark
是Buffer中的另一个属性,它的主要用途是记录一个position的位置,后续调用reset()
方法后会将position重置到mark的位置。mark
可以不被定义,但如果设置了mark
的值,则它不能为负值且不能大于position
的值。
比如我们想要发送三行数据
Hello world.\n
It is my life.\n
I love you.\n
为了提高发送效率,通常我们会将这三行字符串合并到一个Buffer中进行发送。另一端在接收到消息时,由于协议并不理解消息的内容,因此用户在读取数据时,有可能读取出来如下两个包。
Hello world.\nIt is my life.\nI Lo
ve you.\n
这里第一个包出现了原来的两条数据在一个包中的情况,这就叫做粘包。第一行最后的数据将原来的一条数据截断了,这就叫做半包。
我们可以通过如下方式处理粘包和半包的问题。
public class TestStickyAndHalfPackage {
public static void main(String[] args) {
ByteBuffer p1 = ByteBuffer.allocate(64);
p1.put("Hello world.\nIt is my life.\nI lo".getBytes());
split(p1);
p1.put("ve you.\n".getBytes());
split(p1);
}
private static void split(ByteBuffer buffer) {
buffer.flip();
for (int i = 0; i < buffer.limit(); ++i) {
if (buffer.get(i) == '\n') {
int len = i + 1 - buffer.position();
ByteBuffer line = ByteBuffer.allocate(len);
for (int j = 0; j < len; ++j) {
line.put(buffer.get());
}
line.flip();
System.out.print(StandardCharsets.UTF_8.decode(line));
}
}
buffer.compact(); // 将没有读完的数据移动到buffer头部,这里是处理半包和粘包的关键
}
}
概念继承于操作系统IO模型中多路复用IO模型中的selector,其主要作用是,用户可以把所有读写Channel都注册在某个Selector上,Selector会不断的轮询注册在上面的所有channel,如果某个channel为读写等事件做好准备,那么就处于就绪状态,通过Selector可以不断轮询发现出就绪的channel,进行后续的IO操作。为何要做这种设计呢?
如果每一个Channel都需要一个线程来为其IO过程提供服务,则会占用大量的内存,CPU需要在很多线程间进行切换,有太多额外开销,而且随着连接数量增加,线程数量会达到上限,无法支持大连接数。
有一种解决方案是使用有固定线程数量的线程池来处理所有连接请求,但线程池中的线程一旦被占用,就要阻塞等待IO完成才能被其他连接使用,如果IO请求花费时间很长,那会导致后续的大量IO请求需要排队等待。这种情况只适合处理短连接比较多的场景。
针对连接数量非常多,数据流量比较少的场景,多路复用的IO模型就比较适合。如下图所示,每一个Channel可以把自己注册到一个独立运行的Selector线程中,这个Selector线程会轮询所有Channel的读写状态,当发现一个就绪的Channel时,就可以使用工作线程为这个Channel提供服务。这样工作线程就不需要阻塞在某一个Channel上,只有真正要进行数据读写时才分配给某个Channel,极大提高了线程的利用率。
@Slf4j
public class Server {
public static void main(String[] args) {
try (Selector selector = Selector.open(); ServerSocketChannel scc = ServerSocketChannel.open();) {
scc.configureBlocking(false);
scc.bind(new InetSocketAddress(8080));
final SelectionKey sccKey = scc.register(selector, 0, null);
sccKey.interestOps(SelectionKey.OP_ACCEPT); // 配置这个channel只关注accept事件
while (true) {
selector.select();
final Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // 每次准备就绪的channel会被放到selectedKeys这个集合中,但删除,需要手动删除
while (iter.hasNext()) {
final SelectionKey key = iter.next();
log.debug("key: {}", key);
iter.remove(); // 手动从集合中删除处理过的key
if (key.isAcceptable()) {
final ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
final SocketChannel sc = ssc.accept();
sc.configureBlocking(false); // 使用selector时,channel都需要配置成非阻塞的
log.debug("accept: {}", sc);
final SelectionKey scKey = sc.register(selector, 0, null);
scKey.interestOps(SelectionKey.OP_READ);
} else if (key.isReadable()) {
try {
final SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(16);
final int read = sc.read(buffer);
if (read < 0) { // 客户端调用close()正常关闭时,会生成一次READ事件,实际read的返回值是-1,这里处理客户端正常退出
log.warn("cancel {}", key);
key.cancel();
} else {
buffer.flip();
System.out.println(StandardCharsets.UTF_8.decode(buffer));
}
log.debug("after read: {}", sc);
} catch (IOException e) { // 这里处理客户端异常退出
e.printStackTrace();
key.cancel();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}