管道流:

Java NIO 管道是2个线程之间的单向数据连接。Pipe有一个source通道和一个sink通道。数据会被写到sink通道,从source通道读取。

Java NIO 必知必会(Example)-LMLPHP

 1 package base.nio.threaddemo;
 2
 3 import java.io.IOException;
 4 import java.nio.ByteBuffer;
 5 import java.nio.channels.Pipe;
 6
 7 /**
 8  * @program: Lear-Java
 9  * @description:
10  * @author: Mr.Dai
11  * @create: 2018-10-05 20:43
12  **/
13 public class ThreadSend {
14
15     private Pipe pipe;
16
17
18     private void init() throws Exception {
19         this.pipe = Pipe.open();
20     }
21
22
23     class SendInner1 extends Thread {
24
25         @Override
26         public void run() {
27             // 单向流 发送数据
28             try {
29                 Pipe.SinkChannel sink = pipe.sink();
30                 sink.configureBlocking(false);
31
32                 while (true) {
33                     if (sink.isOpen()) {
34                         sink.write(ByteBuffer.wrap("abcd".getBytes()));
35                     }
36                     Thread.sleep(1000);
37                 }
38             } catch (InterruptedException | IOException e) {
39                 e.printStackTrace();
40             }
41         }
42     }
43
44     class ReverInner extends Thread {
45         @Override
46         public void run() {
47             try {
48                 // 单向流 拿到数据
49                 Pipe.SourceChannel source = pipe.source();
50
51                 source.configureBlocking(false);
52
53                 while (true) {
54                     if (source.isOpen()) {
55                         ByteBuffer buffer = ByteBuffer.allocate(10);
56                         buffer.clear();
57                         source.read(buffer);
58                         // 这里必须去掉 trim
59                         if(new String(buffer.array()).trim().equals("")){
60                             continue;
61                         }
62                         System.out.println(new String(buffer.array()).trim());
63                     }
64                     Thread.sleep(1000);
65                 }
66             } catch (InterruptedException | IOException e) {
67                 e.printStackTrace();
68             }
69         }
70     }
71
72     public static void main(String[] args) throws Exception {
73         ThreadSend send = new ThreadSend();
74
75         send.init();
76
77         SendInner1 sendI = send.new SendInner1();
78
79         ReverInner revI = send.new ReverInner();
80
81         sendI.start();
82         revI.start();
83     }
84
85
86 }

套接字通道流

非阻塞模式

ServerSocketChannel可以设置成非阻塞模式。在非阻塞模式下,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null。 因此,需要检查返回的SocketChannel是否是null。如:

 1 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
 2
 3 serverSocketChannel.socket().bind(new InetSocketAddress(9999));
 4 serverSocketChannel.configureBlocking(false);
 5
 6   while(true){
 7     SocketChannel socketChannel =
 8             serverSocketChannel.accept();
 9
10     if(socketChannel != null){
11         //do something with socketChannel...  
12     }
13 }  

server:

 1 package base.nio.chatdemo;
 2
 3
 4 import java.net.InetSocketAddress;
 5 import java.nio.ByteBuffer;
 6 import java.nio.channels.SelectionKey;
 7 import java.nio.channels.Selector;
 8 import java.nio.channels.ServerSocketChannel;
 9 import java.nio.channels.SocketChannel;
10 import java.util.Iterator;
11 import java.util.Set;
12
13 /**
14  * @program: Lear-Java
15  * @description: Nio 聊天服务端
16  * @author: Mr.Dai
17  * @create: 2018-10-05 16:31
18  **/
19 public class ChatServer {
20
21     /**
22      * 通道管理器
23      */
24     private Selector selector;
25
26     private void initServer(int port) throws Exception{
27
28         ServerSocketChannel serverChannel  = ServerSocketChannel.open();
29
30         serverChannel .socket().bind(new InetSocketAddress(port));
31         // 配置非阻塞
32         serverChannel .configureBlocking(false);
33
34
35         this.selector=Selector.open();
36
37         /**
38          * 将通道管理器和该通道绑定,并为该通道注册selectionKey.OP_ACCEPT事件
39          * 注册该事件后,当事件到达的时候,selector.select()会返回,
40          * 如果事件没有到达selector.select()会一直阻塞
41          * selector.selectNow() 立即返回 无论是否准备好 可能返回0
42          */
43         serverChannel .register(this.selector, SelectionKey.OP_ACCEPT);
44
45     }
46
47     /**
48      * 采用轮训的方式监听selector上是否有需要处理的事件,如果有,进行处理
49      */
50     public void listen() throws Exception {
51         System.out.println("start------------------->");
52         while (true){
53             // 在没有注册事件来到时 将会一直阻塞
54             selector.select();
55             Set<SelectionKey> set = selector.selectedKeys();
56             Iterator<SelectionKey> iterator = set.iterator();
57
58             while (iterator.hasNext()){
59                 SelectionKey key = iterator.next();
60                 // 移除当前阻塞队列
61                 iterator.remove();
62                 if(key.isAcceptable()){
63                     ServerSocketChannel server = (ServerSocketChannel) key.channel();
64
65                     SocketChannel channel = server.accept();
66                     channel.configureBlocking(false);
67                     // 服务端发送数据
68                     channel.write(ByteBuffer.wrap(new String("hello client").getBytes()));
69                     // 在客户端连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限
70                     channel.register(this.selector,SelectionKey.OP_READ);
71
72                 }else if(key.isReadable()){
73                     SocketChannel channel  = (SocketChannel) key.channel();
74
75                     ByteBuffer buffer = ByteBuffer.allocate(10);
76                     channel.read(buffer);
77
78                     String msg = new String(buffer.array()).trim();
79
80                     System.out.println("客户端发送过来的讯息:"+msg);
81                     // 在读取后 将柱塞队列数据 改变监听为Accept
82                     ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
83                     channel.write(outBuffer);
84                 }
85                           }
86         }
87
88     }
89
90     public static void main(String[] args)  throws Exception{
91         ChatServer server = new ChatServer();
92         server.initServer(8989);
93         server.listen();
94     }
95
96 }

clien:

 1 package base.nio.chatdemo;
 2
 3 import java.io.IOException;
 4 import java.net.InetSocketAddress;
 5 import java.nio.ByteBuffer;
 6 import java.nio.channels.SelectionKey;
 7 import java.nio.channels.Selector;
 8 import java.nio.channels.SocketChannel;
 9 import java.util.Iterator;
10
11 /**
12  * @program: Lear-Java
13  * @description: nio 聊天客户端
14  * @author: Mr.Dai
15  * @create: 2018-10-05 16:31
16  **/
17 public class ChatClient {
18
19
20     /**
21      *  提供柱阻塞队列 管理器
22      */
23     private Selector selector;
24
25
26     private void ininCliect(String ip,int port) throws Exception{
27
28         SocketChannel channel  = SocketChannel.open();
29
30         channel .connect(new InetSocketAddress(ip,port));
31
32         this.selector=Selector.open();
33
34         channel .configureBlocking(false);
35
36
37         channel .register(this.selector, SelectionKey.OP_CONNECT);
38
39     }
40
41     public void listen() throws Exception {
42
43         while (true){
44
45             selector.select();
46
47             Iterator<SelectionKey> ite  = selector.selectedKeys().iterator();
48
49             while (ite.hasNext()){
50                 SelectionKey key = ite .next();
51                 ite .remove();
52                 if(key.isConnectable()){
53                     SocketChannel channel = (SocketChannel) key.channel();
54                     // 是否准备好连接
55                     if(channel.isConnectionPending()){
56                         channel.finishConnect();
57                     }
58                     channel.configureBlocking(false);
59                     // 向server 发送数据
60                     channel.write(ByteBuffer.wrap("向server 发送数据".getBytes()));
61
62                     channel.register(selector,SelectionKey.OP_READ);
63
64                 }else if(key.isReadable()){
65                         m1(key);
66                 }
67             }
68         }
69     }
70
71     private void m1(SelectionKey key) throws IOException {
72         SocketChannel channel = (SocketChannel) key.channel();
73
74         ByteBuffer buffer = ByteBuffer.allocate(10);
75         channel.read(buffer);
76         System.out.println("服务端的消息为:"+new String(buffer.array()));
77
78         ByteBuffer outBuffer = ByteBuffer.wrap(new String("aaa").getBytes());
79         channel.write(outBuffer);
80     }
81
82     public static void main(String[] args) throws Exception {
83         ChatClient client = new ChatClient();
84
85         client.ininCliect("127.0.0.1",8989);
86         client.listen();
87     }
88
89 }
10-05 21:21