io

IO 系列 NIO

Posted by lichao modified on September 25, 2019

NIO(Non-block I/O)

NIO 提供了 SocketChannel 和 ServerSocketChannel 两种不同的的套接字通道实现。这两种通道都支持阻塞和非阻塞两种模式。

Selector

存储概览 Open JDK 中 Selector 的实现是 SelectorImpl,然后 SelectorImpl 又将职责委托给了具体的平台,比如图中框出的 Linux 2.6 以后才有的 EpollSelectorImpl, Windows 平台的 WindowsSelectorImpl, MacOSX 平台的 KQueueSelectorImpl。

源码追踪

获取 Selector

Selector.open() 可以得到一个 Selector 实例:

缓冲区

缓冲区实质上是一个数组,通常它是一个字节数组(byte buffer),也可以使用其他种类的数组。但是一个缓冲区不仅仅是一个数组,缓冲区还提供了对数据的结构化访问以及维护读写位置(limit)等信息。

所有缓冲区都有4个属性:capacity、limit、position、mark,并遵循:mark <= position <= limit <= capacity,下表格是对着4个属性的解释:

  • clear(): position = 0;limit = capacity;mark = -1; 缓冲区初始化,但是并不影响底层byte数组的内容
  • flip(): limit = position;position = 0;mark = -1; 翻转就是将一个处于存数据状态的缓冲区变为一个处于准备取数据的状态
  • remaining(): return limit - position; 返回limit和position之间相对位置差
  • hasRemaining(): return position < limit返回是否还有未读内容
  • rewind(): 把position设为0,mark设为-1,不改变limit的值
  • compact(): 把从position到limit中的内容移到0到limit-position的区域内,position和limit的取值也分别变成limit-position、capacity。

对于compact()方法,其主要作用在于在读取模式下进行数据压缩,并且方便下一步继续写入数据。比如在一个长度为6的ByteBuffer中写满了数据,然后在读取模式下读取了三个数据之后,我们想继续往buffer中写入数据,此时由于只有前三个字节是可用的,而后三个字节是有效的数据,此时如果写入的话是会把后面三个有效字节给覆盖掉的。因而需要将后面三个有效字节往前移动,以空出三个字节,并且将position指向下一个可供写入的位置,而不是迁移之后的索引0处。compact()方法的作用即在于此。

  • get(): 相对读,从position位置读取一个byte,并将position+1,为下次读写作准备
  • get(int index): 绝对读,读取byteBuffer底层的bytes中下标为index的byte,不改变position
  • put(byte b): 相对写,向position的位置写入一个byte,并将postion+1,为下次读写作准备
  • put(int index, byte b): 绝对写,向byteBuffer底层的bytes中下标为index的位置插入byte b,不改变position
  • mark(): 这个属性是一个标识的作用,即记录当前position的位置
  • rewind(): 将当前的position重置为0,并且mark重置为-1。对于写入模式,rewind()方法会重置为初始写入状态,对于读取模式,rewind()则会重置为初始读取模式,其不会对limit属性有任何影响。

写入模式
如下图所示为初始状态和写入3个字节之后position,limit和capacity三个属性的状态: netty

读取模式
假设我们按照上述方式在初始长度为6的ByteBuffer中写入了三个字节的数据,此时我们将模式切换为读取模式,那么这里的position,limit和capacity则变为如下形式: netty

channel

通道可以用于读写、或两者同时进行

selector

多路复用器Selector,提供选择已经就绪的任务的能力。 selector会不断的轮询注册在其上的channel,如果某个channel上面发生了读写事件,这个channel 就处于就绪状态,会被selector轮询出来,然后通过selectionKey可以获取就绪Chanel的集合,然后进行后续的I/O操作。

(见IO模块/selector)

实例

服务端

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) throws IOException {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 采用默认值
            }
        }
        MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
        new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
public class MultiplexerTimeServer implements Runnable {

    private Selector selector;

    private ServerSocketChannel servChannel;

    private volatile boolean stop;
    /**
     * 初始化多路复用器、绑定监听端口
     *
     * @param port
     */
    public MultiplexerTimeServer(int port) {
        try {
            selector = Selector.open();
            servChannel = ServerSocketChannel.open();
            servChannel.configureBlocking(false);
            servChannel.socket().bind(new InetSocketAddress(port), 1024);
            servChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("The time server is start in port : " + port);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }




    public void stop() {
        this.stop = true;
    }




    /*
     * (non-Javadoc)
     *
     * @see java.lang.Runnable#run()
     */
    @Override
    public void run() {
        while (!stop) {
            try {
                selector.select(1000);
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null)
                                key.channel().close();
                        }
                    }
                }
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }

        // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
        if (selector != null)
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
    }




    private void handleInput(SelectionKey key) throws IOException {

        if (key.isValid()) {
            // 处理新接入的请求消息
            if (key.isAcceptable()) {
                // Accept the new connection
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                SocketChannel sc = ssc.accept();
                sc.configureBlocking(false);
                // Add the new connection to the selector
                sc.register(selector, SelectionKey.OP_READ);
            }
            if (key.isReadable()) {
                // Read the data
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("The time server receive order : "
                            + body);
                    String currentTime = "QUERY TIME ORDER"
                            .equalsIgnoreCase(body) ? new java.util.Date(
                            System.currentTimeMillis()).toString()
                            : "BAD ORDER";
                    doWrite(sc, currentTime);
                } else if (readBytes < 0) {
                    // 对端链路关闭
                    key.cancel();
                    sc.close();
                } else
                    ; // 读到0字节,忽略
            }
        }
    }




    private void doWrite(SocketChannel channel, String response)
            throws IOException {
        if (response != null && response.trim().length() > 0) {
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer);
        }
    }
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args) {

        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 采用默认值
            }
        }
        new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001")
                .start();
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
public class TimeClientHandle implements Runnable {

    private String host;

    private int port;

    private Selector selector;

    private SocketChannel socketChannel;

    private volatile boolean stop;




    public TimeClientHandle(String host, int port) {
        this.host = host == null ? "127.0.0.1" : host;
        this.port = port;
        try {
            selector = Selector.open();
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }




    /*
     * (non-Javadoc)
     *
     * @see java.lang.Runnable#run()
     */
    @Override
    public void run() {
        try {
            doConnect();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        while (!stop) {
            try {
                selector.select(1000);
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null)
                                key.channel().close();
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.exit(1);
            }
        }

        // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
        if (selector != null)
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }

    }




    private void handleInput(SelectionKey key) throws IOException {

        if (key.isValid()) {
            // 判断是否连接成功
            SocketChannel sc = (SocketChannel) key.channel();
            if (key.isConnectable()) {
                if (sc.finishConnect()) {
                    sc.register(selector, SelectionKey.OP_READ);
                    doWrite(sc);
                } else
                    System.exit(1);// 连接失败,进程退出
            }
            if (key.isReadable()) {
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("Now is : " + body);
                    this.stop = true;
                } else if (readBytes < 0) {
                    // 对端链路关闭
                    key.cancel();
                    sc.close();
                } else i j
                    ; // 读到0字节,忽略
            }
        }

    }




    private void doConnect() throws IOException {
        // 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
        if (socketChannel.connect(new InetSocketAddress(host, port))) {
            socketChannel.register(selector, SelectionKey.OP_READ);
            doWrite(socketChannel);
        } else
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
    }




    private void doWrite(SocketChannel sc) throws IOException {
        byte[] req = "QUERY TIME ORDER".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        sc.write(writeBuffer);
        if (!writeBuffer.hasRemaining())
            System.out.println("Send order 2 server succeed.");
    }

}