一、NIO概述
1. BIO带来的挑战
BIO即堵塞式I/O,数据在写入或读取时都有可能堵塞,一旦有堵塞,线程将失去CPU的使用权,性能较差。
2. NIO工作机制
Java NIO由Channel、Buffer、Selector三个核心组成,NIO框架类结构图如下:
其中,Buffer主要负责存取数据,Channel用于数据传输,获取数据,然后流入Buffer;或从Buffer取数据,发送出去。
Selector允许单线程处理多个Channel,如果打开了多个连接(Channel),但每个连接的数据流量很小,使用Selector则很方便。
二、Channel
Channel类主要位于java.nio.channels包下,类结构图如下:
Channel跟流相似,但流是单向的,而Channel是双向的。Channel总是从Buffer获取数据(写文件)或将数据写入Buffer(读文件时)。常用的Channel如下:
- FileChannel,从文件中读写数据。
- DatagramChannel,能通过UDP读写网络中的数据。
- SocketChannel,能通过TCP读写网络中的数据。
- ServerSocketChannel,可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。


package com.yyn.nio;import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel;public class ByteBufferTest {public static void main(String [] args) throws IOException{// testRead(); //从文件读取数据,chanel向buffer中写数据 testWrite();}//写文件public static void testWrite() throws IOException{RandomAccessFile raFile = new RandomAccessFile("byte_buffer_write.txt", "rw");FileChannel fChannel = raFile.getChannel();String data = "天王盖地虎\n小鸡炖蘑菇\n要从此路过\n就得跳支舞";byte[] dataByte = data.getBytes("UTF-8");System.out.println(dataByte.length);ByteBuffer buf = ByteBuffer.allocate(dataByte.length); buf.put(dataByte, 0, dataByte.length);buf.flip(); //切换buffer到读模式fChannel.write(buf); //从buffer读取数据到channelfChannel.force(true); //强制将数据刷新到磁盘,不一定有用 buf.clear();buf.put(dataByte, 0, 10);buf.mark();buf.put(dataByte,10,10);buf.reset();buf.put(dataByte, 0, 10);buf.flip(); //切换buffer到读模式fChannel.write(buf); //从buffer读取数据到channel fChannel.close();System.out.println("write over!!");}//读文件public static void testRead() throws IOException {RandomAccessFile raFile = new RandomAccessFile("test.txt", "rw");FileChannel fChannel = raFile.getChannel();ByteBuffer buf = ByteBuffer.allocate(10);int byteRead = fChannel.read(buf);StringBuffer sBuffer = new StringBuffer();while(byteRead != -1){buf.flip(); //change to read modebyte [] bs = null;int limite = buf.limit();if(buf.hasArray()){bs = buf.array();}if(bs != null){System.out.println("bs length: "+limite);sBuffer.append(new String(bs,0,limite ,"UTF-8"));} buf.clear(); // make buffer ready for write,clear all buffer//buf.compact(); // make buffer ready for write,clear data readed in buffer byteRead = fChannel.read(buf);}fChannel.close();System.out.println("####:"+sBuffer.toString());}}
2.NIO优化方法
2.1 FileChannel.transformXXX方法
2.2 FileChannel.map方法
三、Buffer
Buffer是一片缓冲区,可读可写,非线程安全的,NIO包中针对常用的类型设置了Buffer,类结构图如下:
要使用Buffer,需记住3个方法和4个特性
- flip()方法,切换Buffer为读状态,此时Buffer可读。limit设置为position,position设置为0
- clear()方法,切换Buffer为写状态,会清空Buffer里所有数据。position为0,limit置为capacity
- compact()方法,切换Buffer为写状态,清空Buffer里所有已读数据,将未读数据剪切到Buffer前端。position设置为limit,limit设置为capacity
要理解其4个特性,
- capacity,Buffer的总长度,该值总是保持不变。A buffer's capacity is the number of elements it contains. The capacity of a buffer is never negative and never changes
- position,下一个要操作的数据元素的位置,该值总是小于等于capacity和limit。Buffer为读状态时,表示下一个要读的位置,Buffer为写状态时,表示下一个要写的位置。
A buffer's position is the index of the next element to be read or written. A buffer's position is never negative and is never greater than its limit.
- limit,Buffer中第一个不可操作元素的位置,limit<=capacity。A buffer's limit is the index of the first element that should not be read or written. A buffer's limit is never negative and is never greater than its capacity.
- mark,用于记录当前position的前一个位置
Buffer状态转换过程描述
从Buffer中读数据方式:buffer.get()方法和channel.write()方法。
向Buffer中写数据方式:buffer.put()方法和channel.read()方法。


package com.yyn.nio;import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel;public class ByteBufferTest {public static void main(String [] args) throws IOException{// testRead(); //从文件读取数据,chanel向buffer中写数据 testWrite();}//写文件public static void testWrite() throws IOException{RandomAccessFile raFile = new RandomAccessFile("byte_buffer_write.txt", "rw");FileChannel fChannel = raFile.getChannel();String data = "天王盖地虎\n小鸡炖蘑菇\n要从此路过\n就得跳支舞";byte[] dataByte = data.getBytes("UTF-8");System.out.println(dataByte.length);ByteBuffer buf = ByteBuffer.allocate(dataByte.length); buf.put(dataByte, 0, dataByte.length);buf.flip(); //切换buffer到读模式fChannel.write(buf); //从buffer读取数据到channelfChannel.force(true); //强制将数据刷新到磁盘,不一定有用 buf.clear();buf.put(dataByte, 0, 10);buf.mark();buf.put(dataByte,10,10);buf.reset();buf.put(dataByte, 0, 10);buf.flip(); //切换buffer到读模式fChannel.write(buf); //从buffer读取数据到channel fChannel.close();System.out.println("write over!!");}//读文件public static void testRead() throws IOException {RandomAccessFile raFile = new RandomAccessFile("test.txt", "rw");FileChannel fChannel = raFile.getChannel();ByteBuffer buf = ByteBuffer.allocate(10);int byteRead = fChannel.read(buf);StringBuffer sBuffer = new StringBuffer();while(byteRead != -1){buf.flip(); //change to read modebyte [] bs = null;int limite = buf.limit();if(buf.hasArray()){bs = buf.array();}if(bs != null){System.out.println("bs length: "+limite);sBuffer.append(new String(bs,0,limite ,"UTF-8"));} buf.clear(); // make buffer ready for write,clear all buffer//buf.compact(); // make buffer ready for write,clear dat a readed in buffer byteRead = fChannel.read(buf);}fChannel.close();System.out.println("####:"+sBuffer.toString());}}
2. Buffer其他方法介绍
2.1 rewind()方法
Buffer.rewind()将position设回0,所以你可以重读Buffer中的所有数据。limit保持不变,仍然表示能从Buffer中读取多少。
2.2 equals()方法
当满足下列条件时,表示两个Buffer相等:
有相同的类型(byte、char、int等)。
Buffer中剩余的byte、char等的个数相等。
Buffer中所有剩余的byte、char等都相同。
如你所见,equals只是比较Buffer的一部分,不是每一个在它里面的元素都比较。实际上,它只比较Buffer中的剩余元素。
2.3 compareTo()方法
compareTo()方法比较两个Buffer的剩余元素(byte、char等), 如果满足下列条件,则认为一个Buffer“小于”另一个Buffer:
第一个不相等的元素小于另一个Buffer中对应的元素 。
所有元素都相等,但第一个Buffer比另一个先耗尽(第一个Buffer的元素个数比另一个少)。
3. Buffer的Scatter/Gather
scatter(分散)是指从Channel读取数据后,写入到多个Buffer中。
gather(聚集)是指写操作时,从多个Buffer读取数据并写入到一个Channel中。
四、Selector
Selector在NIO编程中充当一个调度器的角色,轮训在其注册的channel是否ready,若ready则开始执行操作。
仅用单个线程来处理多个Channels的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程处理所有的通道。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源(如内存)。因此,使用的线程越少越好。
但是,需要记住,现代的操作系统和CPU在多任务方面表现的越来越好,所以多线程的开销随着时间的推移,变得越来越小了。实际上,如果一个CPU有多个内核,不使用多任务可能是在浪费CPU能力。不管怎么说,关于那种设计的讨论应该放在另一篇不同的文章中。在这里,只要知道使用Selector能够处理多个通道就足够了。
1. Selector介绍
Selector包含3个Set对象来管理SelectionKey对象,分别是以下三种:
使用Selector前,需要确保以下操作已经执行完成:
- Selector selector = Selector.open(); //调用open方法,获取一个Selector实例。
- channel.configureBlocking(false); // 设置Channel为非堵塞模式
- channel.register(selector , SelectionKey.OP_ACCEPT); //将Channel注册到selector中,并设置需监听的事件
可以监听四种不同类型的事件:
- SelectionKey.OP_CONNECT
- SelectionKey.OP_ACCEPT
- SelectionKey.OP_READ
- SelectionKey.OP_WRITE
如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来,如下:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
2. Selector常用方法
2.1 selectXXX()方法
- int select(),返回就绪channel的个数,会堵塞。
- int select(long timeout),返回就绪channel个数,堵塞timeout
- int selectNow(),返回就绪channel个数,不堵塞
注意:每次调用selectXXX方法时,会返回此次就绪数量,例如,有一个channel就绪,则返回1,但未对这个channel的数据进行处理。接下来又有一个channel就绪,调用selectXXX方法还是返回1,但实际上此时有2个channel就绪但未被处理。
2.2 selectedKeys()
一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用selector的selectedKeys()方法,访问“已选择键集(selected key set)”中的就绪通道。
当像Selector注册Channel时,Channel.register()方法会返回一个SelectionKey 对象。这个对象代表了注册到该Selector的通道。可以通过SelectionKey的selectedKeySet()方法访问这些对象。
注意:Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。下次该通道变成就绪时,Selector会再次将其放入已选择键集中。
2.3 wakeUp()
某个线程调用select()方法后阻塞了,即使没有通道已经就绪,也有办法让其从select()方法返回。只要让其它线程在第一个线程调用select()方法的那个对象上调用Selector.wakeup()方法即可。阻塞在select()方法上的线程会立马返回。
如果有其它线程调用了wakeup()方法,但当前没有线程阻塞在select()方法上,下个调用select()方法的线程会立即“醒来(wake up)”。
2.4 close()
用完Selector后调用其close()方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。通道本身并不会关闭。
3. 基于NIO的网络Demo
3.1 单线程版,主线程负责处理accept和read


package com.yyn.nio.net;import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;/*** 本例子服务端只处理accept和read事件,单线程版* * @author Michael**/ public class NIOSingleServer {private Selector selector = null;//private ExecutorService pool;public static Charset charset = Charset.forName("UTF-8");public NIOSingleServer init(int port) throws IOException {//pool = Executors.newFixedThreadPool(5);ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false); // 设置为非堵塞模式ssc.socket().bind(new InetSocketAddress(port));selector = Selector.open(); // 获取一个selector ssc.register(selector, SelectionKey.OP_ACCEPT);return this;}public void listen() throws IOException {System.out.println("Server started.....");while (true) {int n = 0;n = selector.select(); // 获取就绪操作的个数if(n == 0){continue;}Iterator<SelectionKey> it = selector.selectedKeys().iterator();while (it.hasNext()) {SelectionKey key = it.next();it.remove(); // 每次使用后需要手工移除SocketChannel channel = null;if (key.isAcceptable()) {try {// init函数中注册的是ServerSocketChannelServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();// 获取实际的SocketChannel,类似于Socket IO中的Socketchannel = serverSocketChannel.accept();System.out.println("客户端:" + channel.getRemoteAddress() + "已连接");channel.configureBlocking(false);SelectionKey k = channel.register(selector, SelectionKey.OP_READ); // 注册read监听,监听客户端发过来的数据//Worker worker = new Worker(k);//k.attach(worker);} catch (Exception e) {if (channel != null) {channel.close();}}} else {if (key.isReadable()) {System.out.println("begin to process read!!!!");channel = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(1024);buffer.clear(); //切换buffer为写模式int len = 0;try{while ((len = channel.read(buffer)) > 0) {buffer.flip(); //切换buffer为read模式System.out.println("客户端数据:"+charset.decode(buffer).toString());buffer.clear();}if(len == -1){ // The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream System.out.println("客户端断开");channel.close();continue;}}catch(Exception e){System.out.println("客户端异常啦");}}if (key.isWritable()) {}}}}}public static void main(String[] args) throws IOException {// TODO Auto-generated method stubNIOSingleServer server = new NIOSingleServer();server.init(12003).listen();}}


package com.yyn.nio.net;import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.net.Socket; import java.net.UnknownHostException;public class NIOSingleClient {public static void main(String[] args) throws UnknownHostException, IOException {// TODO Auto-generated method stubSocket socket = new Socket("127.0.0.1", 12003);OutputStreamWriter osw = new OutputStreamWriter(socket.getOutputStream(), "UTF-8");PrintWriter out = new PrintWriter(osw);InputStreamReader isr = new InputStreamReader(new BufferedInputStream(System.in), "UTF-8");BufferedReader in = new BufferedReader(isr);String data = "";while(true){data = in.readLine();data = data.trim().toUpperCase();if(data.equals("EIXT")){out.close();socket.close();System.exit(0);}System.out.println("read data from comsole:" + data);out.println(data);out.flush();System.out.println("sending data to server:" + data);}}}
3.2 多线程版,主线程负责accept,子线程负责read


package com.yyn.nio.net;import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; import java.util.logging.LoggingMXBean;/*** 本例子服务端只处理accept和read事件,多线程版* @author Michael**/ public class NIOMultiServer {private Selector acceptSelector = null;private Selector readSelector = null;public static Charset charset = Charset.forName("UTF-8");public NIOMultiServer init(int port) throws IOException {//pool = Executors.newFixedThreadPool(5);ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false); // 设置为非堵塞模式ssc.socket().bind(new InetSocketAddress(port));acceptSelector = Selector.open(); // 获取一个selectorreadSelector = Selector.open();ssc.register(acceptSelector, SelectionKey.OP_ACCEPT);return this;}public void listen() throws IOException {System.out.println("Server started.....");new Worker(this.readSelector).start(); while (true) {int n = 0;n = acceptSelector.select();if(n == 0)continue;Iterator<SelectionKey> it = acceptSelector.selectedKeys().iterator();while (it.hasNext()) {SelectionKey key = it.next();it.remove();// init函数中注册的是ServerSocketChannelServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();// 获取实际的SocketChannel,类似于Socket IO中的Socket,必须accept后才有SocketChannelSocketChannel channel = serverSocketChannel.accept();channel.configureBlocking(false);System.out.println("客户端:" + channel.getRemoteAddress() + "已连接");if(key.isAcceptable()){channel.register(this.readSelector, SelectionKey.OP_READ);}}}}private static class Worker extends Thread{private Selector readSelector = null;public Worker(Selector selector){this.readSelector = selector;}public void run(){System.out.println("Read thread started....");while (true) {int n = 0;SocketChannel channel = null;try {n= this.readSelector.select(10);if(n == 0)continue;System.out.println("read thread, n is: " + n);Iterator<SelectionKey> it = readSelector.selectedKeys().iterator();while (it.hasNext()) {SelectionKey key = it.next();it.remove();if(key.isReadable()){channel = (SocketChannel) key.channel();System.out.println("begin to process read at: " + channel.getRemoteAddress());ByteBuffer buffer = ByteBuffer.allocate(1024);buffer.clear(); //将buffer切换为写模式long len = 0;while((len = channel.read(buffer)) > 0){buffer.flip(); //将buffer切换为读模式System.out.println("客户端数据:"+charset.decode(buffer).toString());buffer.clear();}if(len == -1){System.out.println("客户端断开");channel.close();continue;}}}} catch (IOException e) {System.out.println("客户端异常啦");try {channel.close();} catch (IOException e1) {// TODO Auto-generated catch blockSystem.out.println("关闭channel发生异常");}}}}}public static void main(String[] args) throws IOException {NIOMultiServer server = new NIOMultiServer();server.init(12003);server.listen();}}


package com.yyn.nio.net;import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.net.Socket; import java.net.UnknownHostException;public class NIOSingleClient {public static void main(String[] args) throws UnknownHostException, IOException {// TODO Auto-generated method stubSocket socket = new Socket("127.0.0.1", 12003);OutputStreamWriter osw = new OutputStreamWriter(socket.getOutputStream(), "UTF-8");PrintWriter out = new PrintWriter(osw);InputStreamReader isr = new InputStreamReader(new BufferedInputStream(System.in), "UTF-8");BufferedReader in = new BufferedReader(isr);String data = "";while(true){data = in.readLine();data = data.trim().toUpperCase();if(data.equals("EIXT")){out.close();socket.close();System.exit(0);}System.out.println("read data from comsole:" + data);out.println(data);out.flush();System.out.println("sending data to server:" + data);}}}
dd