Netty(1)——NIO基础

NIO,Netty,基础 · 浏览次数 : 144

小编点评

代码设计中涉及到以下几个方面: * **Selector**:Selector是NIO消息服务器中用于处理多个channel的 selector,它负责轮询channel,当发现某个channel就绪时,就会将其加入到Selector中,并从Selector中获取对应的channel。 * **Channel**:Channel是NIO消息服务器中用于处理单个channel的通道,它负责接收来自客户端的读写数据,并将数据发送到客户端。 * **NioMessage**:NioMessage是NIO消息服务器中用于处理channel之间的消息,它负责将消息从一个channel传递到另一个channel。 代码设计中使用了以下技术: * **多线程**:为了处理多个channel的请求,代码使用了多线程。 * **Selector**:Selector是用于处理多个channel的 selector,它轮询channel,当发现某个channel就绪时,就会将其加入到Selector中,并从Selector中获取对应的channel。 * **Channel**:Channel是用于处理单个channel的通道,它负责接收来自客户端的读写数据,并将数据发送到客户端。 * **NioMessage**:NioMessage是用于处理channel之间的消息,它负责将消息从一个channel传递到另一个channel。 代码设计中使用了Selector来轮询channel,Selector负责当发现某个channel就绪时,将其加入到Selector中,并从Selector中获取对应的channel。这样可以保证在每个channel上只有一个线程负责处理消息,并提高效率。

正文

本篇主要介绍Java NIO的基本原理和主要组件

Netty是由JBOSS提供的Java开源网络应用程序框架,其底层是基于Java提供的NIO能力实现的。因此为了掌握Netty的底层原理,需要首先了解Java NIO的原理。

NIO简介

计算机主要由CPU、内存、外存、IO设备等硬件组成,计算机执行计算的过程就是CPU从内存中获取数据,进行计算,然后再将计算结果写入内存中。但由于内存非常昂贵且下电后数据会丢失,计算机需要使用外存来持久化存储大规模的数据,外存提供了大量的存储空间,代价是其存取速度远小于内存。除了读取外存数据,计算机还可以从网络设备获取网络中的数据,受网络传输速度的限制,计算机获取网络数据的速度也远小于其读取内存的速度。

对于存取这些低速IO设备,操作系统(如Linux)提供了5种不同的IO模型。对于Java来说,其最先提供的就是基于最简单的阻塞式IO模型实现的BIO(Blocking IO)库。当调用BIO库读取硬盘中的数据时,用户进程会一直被阻塞在读数据的接口上,直到数据被操作系统从硬盘中获取出来并返回给用户,这时用户进程才能继续向下执行。由于读取硬盘速度相比CPU计算速度慢很多,进程就会一直被卡在读取数据这里,用户体验就是进程没有响应。即使CPU处于空闲状态,也无法使用CPU进行其他工作。这就浪费了大量的资源,同时也给用户造成了不好的体验。

除了最传统的阻塞式IO,操作系统还提供了其他几种改进的IO模型,总体思想都是尽可能减少用户进程阻塞在IO上的时间,在进行慢速设备IO时,进程无需等待,可以继续处理其他指令,当数据获取完成时操作系统再通知用户进程可以进行后续的数据处理操作。因此Java在1.4版本后就推出了一套新的IO接口NIO(New IO),这套IO接口基于多路复用IO模型,提供了非阻塞的IO能力,因此NIO中的N也可以理解为Non-blocking。这套NIO接口实现了只用一个线程来轮询等待所有应用进程的IO就绪状态,当某个应用进程的IO状态就绪时,再通知对应进程进行数据读写的操作。这就避免了每个应用进程在IO时被阻塞,为开发高性能和高并发的应用提供了关键能力。

NIO的3个核心组件

  • Channel
  • Buffer
  • Selector

Channel(通道)

image

Channel 是 NIO 的核心概念,它表示一个打开的连接,是数据读写的双向通道,这个连接可以连接到 I/O 设备(例如:磁盘文件,Socket)或者一个支持 I/O 访问的应用程序,Java NIO 使用缓冲区和通道来进行数据传输。

Channel的主要实现类有:

  • FileChannel(读写文件)
  • DatagramChannel(UDP编程)
  • SocketChannel(TCP编程)
  • ServerSocketChannel(TCP编程)

FileChannel

FileChannel只能工作在阻塞模式下,不能配合Selector

FileChannel不能直接打开,必须通过FileInputStreamFileOutputStreamRandomAccessFile来获取,它们都有getChannel()方法

  • 通过FileInputStream获取的channel只能读
  • 通过FileOutputStream获取的channel只能写
  • 通过RandomAccessFile是否能读写根据构造时的读写模式决定

transferTo方法

效率相比使用流式方式拷贝数据高很多,底层使用了操作系统提供的零拷贝特性。

一次最多传输2g的数据

Buffer(缓冲区)

Buffer是NIO的另一个核心概念,NIO库操作数据都是通过缓冲区处理的,在数据读写的过程都要先经过缓冲区,然后再从缓冲区中按照块来处理数据。

image

从类图中可以看到,7 种数据类型对应着 7 种子类,这些名字是 Heap 开头子类,数据是存放在 JVM 堆中的。

MappedByteBuffer

MappedByteBuffer存放在堆外直接内存中,可以与文件进行映射。

通过java.nio包和MappedByteBuffer允许Java程序直接从内存中读取文件内容,通过将整个或部分文件映射到内存,由操作系统来处理加载请求和写入文件,应用只需要和内存打交道,这使得IO操作非常快。

Mmap内存映射和普通标准IO操作的本质区别在于它并不需要将文件中的数据先拷贝至OS的内核IO缓冲区,而是可以直接将用户进程私有地址空间中的一块区域与文件对象建立映射关系,这样程序就好像可以直接从内存中完成对文件读/写操作一样。

ByteBuffer的正确使用方式

  1. 向buffer中写入数据,例如channel.read(buf)
  2. 调用flip()切换成读模式
  3. 从buffer中读取数据,例如buffer.get()
  4. 调用clear()compact()切换为写模式
  5. 重复1-4
点击查看代码
@Slf4j
public class TestByteBuffer {
    public static void main(String[] args) {
        try (FileInputStream fileInputStream = new FileInputStream("data.txt");
             FileChannel channel = fileInputStream.getChannel()) {
            ByteBuffer buffer = ByteBuffer.allocate(10);
            while (true) {
                int len = channel.read(buffer);
                if (len < 0) {
                    break;
                }
                log.info("读到的字符数:{}", len);
                buffer.flip();
                while (buffer.hasRemaining()) {
                    byte b = buffer.get();
                    log.info("读到的字符:{}", (char) b);
                }
                buffer.clear();
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

Java规定内码使用UTF-16编码,一个字符占用2个字节,也就是Java中的char类型在内存中是使用UTF-16的编码形式存储的。而我们代码中读取的文件data.txt使用的是UTF-8编码格式,因此对于其中的英文字符和数字,一个字符只占用一个字节。因此代码中我们可以每次读取一个字节,然后再把它转换成Java内部的char。

Buffer的结构

Buffer是由特定基本类型的元素组成的线性有限序列,除了Buffer里面的内容,其最重要的属性就是它的capacitylimitposition

  • capacity:Buffer中可以储存的元素数量,Buffer的capacity不能为负值也永远不会改变。
  • limit:Buffer中第一个不能被读取或写入的元素的位置,limit不能为负值也不能大于capacity
  • position:Buffer中下一个将要被读取或写入的元素的位置,position不能为负值也不能大于limit

三个属性是如何控制读写过程的

  1. 通过源码可以看出,调用ByteBuffer.allocate(10)的时候,我们初始化了一个HeapByteBuffer对象,并将其capacitylimit均设置为10,position被设置为0。
点击查看代码
// ByteBuffer.java
public static ByteBuffer allocate(int capacity) {
    if (capacity < 0)
        throw new IllegalArgumentException();
    return new HeapByteBuffer(capacity, capacity);
}
// HeapByteBuffer.java,调用的HeapByteBuffer中的构造方法
HeapByteBuffer(int cap, int lim) {            // package-private
    super(-1, 0, lim, cap, new byte[cap], 0);
}

image

  1. 当调用channel.read(buf)向Buffer中写入数据时,根据源码分析,其最终会调入ByteBufferput()方法中,HeapByteBuffer对其的实现如下,nextPutIndex()方法检查当前position是否大于等于limit,如果小于limit,则将原position返回,并将原position加1。
点击查看代码
// HeapByteBuffer.java
public ByteBuffer put(byte x) {
    hb[ix(nextPutIndex())] = x;
    return this;
}

image

  1. 当写入完成后,我们调用flip()方法,所谓将Buffer切换为读模式,其实源码中就是将position和limit的位置重新赋值。如此操作后,position就是我们读取数据的起点,limit就是我们读取数据的终点。
点击查看代码
// Buffer.java
public final Buffer flip() {
    limit = position;
    position = 0;
    mark = -1;
    return this;
}

image

  1. 切换读模式后,就可以调用buffer.get()方法来获取一个字节,通过源码可以看出,nextGetIndex()方法检查当前position是否大于等于limit,如果小于limit,则将原位置返回,并将原位置加1。
点击查看代码
// HeapByteBuffer.java
public byte get() {
    return hb[ix(nextGetIndex())];
}

image

  1. 当读取所有数据后,可以调用buffer.clear()方法或buffer.compact()方法将Buffer切换为写模式。

    • clear(): 直接将Buffer重置为初始状态,忽略还没有读完的数据。
    • compact():将还没读完的数据复制到缓冲区头部,然后从没读完的数据后可以开始写入新的数据
点击查看代码
// Buffer.java
public final Buffer clear() {
    position = 0;
    limit = capacity;
    mark = -1;
    return this;
}

// HeapByteBuffer.java
public ByteBuffer compact() {
    System.arraycopy(hb, ix(position()), hb, ix(0), remaining());  // 将还没读完的数据拷贝到数组头部
    position(remaining());  // 将position重置为剩余待读数据之后
    limit(capacity());  // 将limit重置为capacity
    discardMark();
    return this;
}

image

mark和reset

mark是Buffer中的另一个属性,它的主要用途是记录一个position的位置,后续调用reset()方法后会将position重置到mark的位置。mark可以不被定义,但如果设置了mark的值,则它不能为负值且不能大于position的值。

粘包和半包

比如我们想要发送三行数据

Hello world.\n
It is my life.\n
I love you.\n

为了提高发送效率,通常我们会将这三行字符串合并到一个Buffer中进行发送。另一端在接收到消息时,由于协议并不理解消息的内容,因此用户在读取数据时,有可能读取出来如下两个包。

Hello world.\nIt is my life.\nI Lo
ve you.\n

这里第一个包出现了原来的两条数据在一个包中的情况,这就叫做粘包。第一行最后的数据将原来的一条数据截断了,这就叫做半包。

我们可以通过如下方式处理粘包和半包的问题。

点击查看代码
public class TestStickyAndHalfPackage {
    public static void main(String[] args) {
        ByteBuffer p1 = ByteBuffer.allocate(64);
        p1.put("Hello world.\nIt is my life.\nI lo".getBytes());
        split(p1);
        p1.put("ve you.\n".getBytes());
        split(p1);
    }

    private static void split(ByteBuffer buffer) {
        buffer.flip();
        for (int i = 0; i < buffer.limit(); ++i) {
            if (buffer.get(i) == '\n') {
                int len = i + 1 - buffer.position();
                ByteBuffer line = ByteBuffer.allocate(len);
                for (int j = 0; j < len; ++j) {
                    line.put(buffer.get());
                }
                line.flip();
                System.out.print(StandardCharsets.UTF_8.decode(line));
            }
        }
        buffer.compact();  // 将没有读完的数据移动到buffer头部,这里是处理半包和粘包的关键
    }
}

Selector(选择器)

概念继承于操作系统IO模型中多路复用IO模型中的selector,其主要作用是,用户可以把所有读写Channel都注册在某个Selector上,Selector会不断的轮询注册在上面的所有channel,如果某个channel为读写等事件做好准备,那么就处于就绪状态,通过Selector可以不断轮询发现出就绪的channel,进行后续的IO操作。为何要做这种设计呢?

如果每一个Channel都需要一个线程来为其IO过程提供服务,则会占用大量的内存,CPU需要在很多线程间进行切换,有太多额外开销,而且随着连接数量增加,线程数量会达到上限,无法支持大连接数。

有一种解决方案是使用有固定线程数量的线程池来处理所有连接请求,但线程池中的线程一旦被占用,就要阻塞等待IO完成才能被其他连接使用,如果IO请求花费时间很长,那会导致后续的大量IO请求需要排队等待。这种情况只适合处理短连接比较多的场景。

针对连接数量非常多,数据流量比较少的场景,多路复用的IO模型就比较适合。如下图所示,每一个Channel可以把自己注册到一个独立运行的Selector线程中,这个Selector线程会轮询所有Channel的读写状态,当发现一个就绪的Channel时,就可以使用工作线程为这个Channel提供服务。这样工作线程就不需要阻塞在某一个Channel上,只有真正要进行数据读写时才分配给某个Channel,极大提高了线程的利用率。

image

NIO消息服务器示例

点击查看代码
@Slf4j
public class Server {
    public static void main(String[] args) {
        try (Selector selector = Selector.open(); ServerSocketChannel scc = ServerSocketChannel.open();) {
            scc.configureBlocking(false);
            scc.bind(new InetSocketAddress(8080));

            final SelectionKey sccKey = scc.register(selector, 0, null);
            sccKey.interestOps(SelectionKey.OP_ACCEPT);  // 配置这个channel只关注accept事件

            while (true) {
                selector.select();
                final Iterator<SelectionKey> iter = selector.selectedKeys().iterator();  // 每次准备就绪的channel会被放到selectedKeys这个集合中,但删除,需要手动删除
                while (iter.hasNext()) {
                    final SelectionKey key = iter.next();
                    log.debug("key: {}", key);
                    iter.remove();  // 手动从集合中删除处理过的key
                    if (key.isAcceptable()) {
                        final ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                        final SocketChannel sc = ssc.accept();
                        sc.configureBlocking(false);  // 使用selector时,channel都需要配置成非阻塞的
                        log.debug("accept: {}", sc);
                        final SelectionKey scKey = sc.register(selector, 0, null);
                        scKey.interestOps(SelectionKey.OP_READ);
                    } else if (key.isReadable()) {
                        try {
                            final SocketChannel sc = (SocketChannel) key.channel();
                            ByteBuffer buffer = ByteBuffer.allocate(16);
                            final int read = sc.read(buffer);
                            if (read < 0) {  // 客户端调用close()正常关闭时,会生成一次READ事件,实际read的返回值是-1,这里处理客户端正常退出
                                log.warn("cancel {}", key);
                                key.cancel();
                            } else {
                                buffer.flip();
                                System.out.println(StandardCharsets.UTF_8.decode(buffer));
                            }
                            log.debug("after read: {}", sc);
                        } catch (IOException e) {  // 这里处理客户端异常退出
                            e.printStackTrace();
                            key.cancel();
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

处理消息边界问题

与Netty(1)——NIO基础相似的内容:

Netty(1)——NIO基础

本篇主要介绍Java NIO的基本原理和主要组件 Netty是由JBOSS提供的Java开源网络应用程序框架,其底层是基于Java提供的NIO能力实现的。因此为了掌握Netty的底层原理,需要首先了解Java NIO的原理。 NIO简介 计算机主要由CPU、内存、外存、IO设备等硬件组成,计算机执行

滴滴面试:谈谈你对Netty线程模型的理解?

Netty 线程模型是指 Netty 框架为了提供高性能、高并发的网络通信,而设计的管理和利用线程的策略和机制。 Netty 线程模型被称为 Reactor(响应式)模型/模式,它是基于 NIO 多路复用模型的一种升级,它的核心思想是将 IO 事件和业务处理进行分离,使用一个或多个线程来执行任务的一

Netty服务端开发及性能优化

Netty是一个异步基于事件驱动的高性能网络通信框架,可以看做是对NIO和BIO的封装,并提供了简单易用的API、Handler和工具类等,用以快速开发高性能、高可靠性的网络服务端和客户端程序。

netty系列之:来,手把手教你使用netty搭建一个DNS tcp服务器

简介 在前面的文章中,我们提到了使用netty构建tcp和udp的客户端向已经公布的DNS服务器进行域名请求服务。基本的流程是借助于netty本身的NIO通道,将要查询的信息封装成为DNSMessage,通过netty搭建的channel发送到服务器端,然后从服务器端接受返回数据,将其编码为DNSR

Netty-BIO、NIO、AIO、零拷贝-2

Java BIO 编程 一、I/O 模型 1、I/O 模型简单的理解:就是用什么样的通道进行数据的发送和接收,很大程度上决定了程序通信的性能 2、Java 共支持 3 种网络编程模型/IO 模式:BIO、NIO、AIO 3、Java BIO : 同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个

Netty-架构设计及入门程序-3

一、原生 NIO 存在的问题 1、NIO 的类库和 API 繁杂,使用麻烦:需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。2、需要具备其他的额外技能:要熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor

Java NIO 图解 Netty 服务端启动的过程

本文在了解netty核心组件的前提下,进一步了解组件如何在整个服务器启动过程如何被创建,如何组件之间配合来使用。首先也是先了解下大概服务端的启动过程,并且在了解过程中我们带着自己的问题去在学习过程中探寻答案

一文详解 Netty 组件

Netty 是一款优秀的高性能网络框架,内部通过 NIO 的方式来处理网络请求,在高负载下也能可靠和高效地处理 I/O 操作。下面这篇文章将主要对 Netty 中的各个组件进行分析,并在介绍完了各个组件之后,通过 JSF 这个 RPC 框架为例来分析 Netty 的使用。

Netty-介绍-1

Netty介绍和应用场景 要求 已经掌握了 主要技术构成: Java OOP 编程、 Java 多线程编程、 Java IO 编程 、 Java 网络编程、 常用的Java 设计模式(比如 观察者模式 ,命令模式,职责链模式 )、 常用的数据结构(比如 链表) Netty的介绍 1、Netty 是由

美团面试:说说Netty的零拷贝技术?

零拷贝技术(Zero-Copy)是一个大家耳熟能详的技术名词了,它主要用于提升 IO(Input & Output)的传输性能。 那么问题来了,为什么零拷贝技术能提升 IO 性能? 1.零拷贝技术和性能 在传统的 IO 操作中,当我们需要读取并传输数据时,我们需要在用户态(用户空间)和内核态(内核空