首頁技術文章正文

Java培訓:看了就能懂的NIO使用深入詳解

更新時間:2022-11-18 來源:黑馬程序員 瀏覽量:

  NIO概述

  NIO介紹

  傳統(tǒng)IO流(java.io):讀寫操作結束前,處于線性阻塞,代碼簡單,安全,性能低

  NIO:支持非阻塞式編程,性能更有優(yōu)勢,但代碼編寫較為復雜。

  概念理解

  同步(synchronous):一條線程執(zhí)行期間,其他線程就只能等待。

  異步(asynchronous):一條線程在執(zhí)行期間,其他線程無需等待。

  阻塞(blocking):當前任務未執(zhí)行結束,會阻礙后續(xù)任務的執(zhí)行。

  非阻塞(non-blocking):當前任務未執(zhí)行結束,不會阻礙后續(xù)任務的執(zhí)行。

  IO流與NIO的區(qū)別

  NIO是面向緩沖區(qū),IO 面向流。

  NIO是非阻塞的,IO是阻塞的。

  NIO可以使用選擇器,IO不涉及選擇器。

  NIO組成

  Buffer(緩沖區(qū),負責讀寫數據,類似火車)

  Channel(通道 ,負責傳輸,類似鐵軌)

  Selector(選擇器,負責調度通道,類似指揮中心)

  Buffer

  介紹

  理解:實質相當于普通IO流中的數組,負責數據的存和取。但是它提供了對數據的結構化訪問,可以跟蹤系統(tǒng)的讀、寫進程。

  常見分類:ByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、ShortBuffer。

  核心屬性

  capacity:代表緩沖區(qū)的最大容量。

  limit:代表剩余(可存入/可讀取)數量

  position:代表(存入/讀取)位置

  mark:標記當前position的位置。

  四個屬性關系:mark <= position <= limit <= capacity

  構造方法(以ByteBuffer為例)

  static ByteBuffer allocate(int capacity)分配一個新的字節(jié)緩沖區(qū)。

  static ByteBuffer allocateDirect(int capacity) 分配新的直接字節(jié)緩沖區(qū)。

  static ByteBuffer wrap(byte[] array)將 byte 數組包裝到緩沖區(qū)中。

  常用方法

  獲取屬性值

  capacity():獲取緩沖區(qū)的最大容量。

  limit():獲取剩余(可存入/可讀取)數量。

  position():獲取(存入/讀取)位置。

  mark():標記當前position的位置。

  存取數據

  put(Xxx[] xxx) 存入數據到緩沖區(qū)中,position >= limit不可寫。

  get() 獲取緩沖區(qū)的position位置數據,并將position后移,position >= limit不可讀。

  核心方法

  flip()翻轉此緩沖區(qū)(limit=capacity-postion,postion=0),清除標記,用于讀取模式。

  clear()清除此緩沖區(qū)(limit=capacity,postion=0),清除標記,用于寫入模式。

  rewind() 倒回這個緩沖區(qū)(position=0),清除標記。

  reset() 將此緩沖區(qū)的位置重置為先前標記的位置(position=mark)。

  演示代碼

public class Test01Buffer {
    public static void main(String[] args) {
        //新建緩沖區(qū)對象(默認為寫入模式)
        ByteBuffer b = ByteBuffer.allocate(10);

        //寫入數據
        System.out.println("====寫入模式屬性狀態(tài)====");
        showProperty(b);
        System.out.println("====讀取模式屬性狀態(tài)====");
        //切換為讀取模式
        b.flip();
        showProperty(b);

        System.out.println("====寫入數據====");
        b.clear();
        b.put(new byte[]{1,2});
        showProperty(b);

        System.out.println("====讀取數據====");
        b.flip();
        System.out.println("position-------->" + b.position() + ",get:" + b.get());
        //循環(huán)遍歷通用格式
        //while (b.position()<b.limit()){
        //    System.out.println("position-------->" + b.position() + ",get:" + b.get());
        //}

        System.out.println("====重置操作位置前:記錄位置====");
        showProperty(b);
        //記錄位置
        b.mark();
        System.out.println("====重置操作位置前:獲取新數據====");
        System.out.println("position-------->" + b.position() + ",get:" + b.get());
        showProperty(b);
        System.out.println("====重置操作位置后====");
        b.reset();
        showProperty(b);

        System.out.println("====倒回緩沖區(qū)前====");
        showProperty(b);
        System.out.println("====倒回緩沖區(qū)后====");
        b.rewind();
        showProperty(b);
    }

    //展示參數
    public static void showProperty(ByteBuffer b) {
        //容量
        System.out.println("capacity:" + b.capacity());
        //可存放個數
        System.out.println("limit:" + b.limit());
        //下一個存入位置
        System.out.println("position:" + b.position());
    }
}

  Channel入門

  介紹

  理解 Channel理解為通道,包含了寫入和讀取的操作,可以理解為IO中的流對象。Channel負責讀寫,Buffer負責存取。

  常見分類:FileChannel、SocketChannel、ServerSocketChannel、DatagramChannel

  Channel與IO流區(qū)別

  Channel是雙向的,既可以讀又可以寫,而IO是單向的。

  Channel可以進行異步的讀寫,IO是不支持異步。

  Channel的讀寫必須通過buffer對象,IO通過流可以直接讀寫。

  構造方法(以FileChannel為例 )

  在IO流FileXXX字節(jié)流中提供了getChannel()方法獲取FileChannel對象。

  FileChannel getChannel() 通過FileXXX字節(jié)流的方法獲取對象

  常用方法

  int read(ByteBuffer dst):將數據讀取到緩沖區(qū)中

  int write(ByteBuffer src):將數據從緩沖區(qū)中寫出到指定位置

  演示代碼

public class Test02FileChannel {
    public static void main(String[] args) throws IOException {
        FileChannel in = new FileInputStream("D:\\image.jpg").getChannel();
        FileChannel out = new FileOutputStream("D:\\imageCopy.jpg").getChannel();

        ByteBuffer b = ByteBuffer.allocate(10);
        int len = -1;
        while ((len = in.read(b)) != -1) {
            b.flip();
            out.write(b);
            b.clear();
        }
        in.close();
        out.close();
    }
}

  ChannelTCP協(xié)議編程

  介紹

  NIO中通過SocketChannel與ServerSocketChannel替代TCP協(xié)議的網絡通信編程

  客戶端通道操作

  SocketChannel 客戶端通道,用于讀寫TCP網絡協(xié)議數據

  獲取對象 public static SocketChannelopen()

  連接服務器 boolean connect(SocketAddress remote)

  SocketAddress是抽象類,使用其子類InetSocketAddress創(chuàng)建的對象。InetSocketAddress(String ip,int port)

  等待客戶端連接 SocketChannel accept()

  服務端通道操作

  ServerSocketChannel 服務端通道,用于服務端監(jiān)聽TCP連接

  獲取對象 public static ServerSocketChannel open()

  綁定端口號 ServerSocketChannel bind(SocketAddress local)

  服務器代碼

public class Test03ServerByChanner {
    public static void main(String[] args) throws IOException {
        //獲取服務器通道對象
        ServerSocketChannel serverSocket = ServerSocketChannel.open();
        //綁定端口
        ServerSocketChannel socket = serverSocket.bind(new InetSocketAddress(8888));
        SocketChannel server = socket.accept();

        //接收數據
        System.out.println("服務端開始接收數據......");
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int len = -1;
        while ((len = server.read(buffer)) != -1) {
            //翻轉緩沖區(qū),讀取數據
            buffer.flip();
            System.out.println("server:" + new String(buffer.array()));
            buffer.clear();
        }

        System.out.println("服務端開始反饋數據......");

        buffer.put("數據收到了".getBytes());
        //翻轉緩沖區(qū),讀取數據
        buffer.flip();
        //取出緩沖區(qū)數據,寫會給客戶端
        server.write(buffer);
       
        server.close();

    }
}

  客戶端代碼

public class Test03ClientByChannel {
    public static void main(String[] args) throws Exception {
        //獲取連接對象
        SocketChannel client = SocketChannel.open();
        //連接服務器
        client.connect(new InetSocketAddress("localhost", 8888));

        //發(fā)送數據
        System.out.println("客戶端開始發(fā)送數據......");
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        buffer.put("服務器,你好啊".getBytes());
        //翻轉緩沖區(qū),讀取數據
        buffer.flip();
        //從緩沖區(qū)取出數據寫入通道
        client.write(buffer);
        client.shutdownOutput();

        //等待反饋
        buffer.clear();
        int len = -1;
        while ((len = client.read(buffer)) != -1) {
            buffer.flip();
            System.out.println("client:" + new String(buffer.array()));
            buffer.clear();
        }
        //關閉客戶端
        client.close();
    }
}

  多路復用

  介紹

  非多路復用:服務器端需要為每個端口的每次請求,開辟線程處理業(yè)務,高并發(fā)狀態(tài)下會造成系統(tǒng)性能下降。

1668740360936_1.jpg

  多路復用:服務器端利用一個線程處理多個端口的訪問請求,節(jié)省CPU資源,提高程序運行效率,高并發(fā)狀態(tài)下有明顯優(yōu)勢。

1668740374304_2.jpg

  核心知識

      1.通過Selector中的open方法,獲取選擇器對象

  public static Selector open():獲取Selector對象

  2.通過Channel中的方法,注冊通道給選擇器

  ①創(chuàng)建通道對象,設置通道屏蔽模式

  void configureBlocking(boolean block)

 ?、趯⑼ǖ雷越o選擇器,并設置該通道的關注事件

  SelectionKey register(Selector sel,int ops)

  Selector sel 要注冊的選擇器

  ops表示注冊的事件類型,在 SelectionKey類中提供的四種類型實現。

  SelectionKey.OP_ACCEPT : 接收連接就緒事件,表示服務器監(jiān)聽到了客戶連接,服務器可以接收這個連接了

  SelectionKey.OP_CONNECT:連接就緒事件,表示客戶端和服務器的連接已經建立成功

  SelectionKey.OP_READ: 讀就緒事件,表示通道中有了可讀的數據,可以執(zhí)行讀操作了

  SelectionKey.OP_WRITE: 寫就緒事件,表示已經可以向通道中寫數據了

  注意事項

        被注冊的Channel必須支持異步模式,否則異步NIO就無法工作,例如FileChannel(沒有異步模式)不能被注冊到Selector。

        ServerSocketChannel在注冊時,只能使用以OP_ACCEPT狀態(tài)注冊,否則拋出異常。

        SocketChannel在注冊時,不支持OP_ACCEPT狀態(tài)注冊。

  3.通過Selector中的方法,獲取事件

  int select():將事件存放至事件集合,返回已就緒事件個數。如果沒有新的已就緒事件,該方法將持續(xù)阻塞。

  Selector的Set selectedKeys():返回選擇器的已就緒事件集。

  Set keys():返回選擇器的感興趣事件集(已注冊的事件數)。

  SelectionKey概述

         SelectionKey 代表一個通道在Selector的注冊事件關系鍵。

         當Selector通知某個傳入事件時,是通過對應 SelectionKey 進行傳遞的。

         想要取消已注冊的通道事件,需要通過SelectionKey的cancel方法完成。

  SelectionKey中屬性:

              Interest set:興趣集,表示已注冊的事件集合,下一次調用方法,將測試是否有此事件的加入。

              通過SelectionKey的 int interestOps() 方法,可以獲取當前 SelectionKey的感興趣事件。

              Ready set:準備集,表示已準備就緒的事件集合。

              通過SelectionKey的 int readyOps()方法,可以獲取當前 SelectionKey的準備就緒事件。

              Channel:事件對應的通道。

              通過SelectionKey的 SelectableChannel channel()方法,可以獲取當前 SelectionKey的表示的通道。

             Selector:事件綁定的選擇器。

             通過SelectionKey的 Selector selector() 方法,可以獲取當前 SelectionKey的綁定的選擇器。

            Attached:事件對象的附加信息。

            通過 SelectionKey的 Object attach(Object ob)方法,將給定對象附加到此鍵。

            通過 SelectionKey的 Object attachment()方法,檢索當前的附件。

            通過 Channel的SelectionKey register(Selector sel,int ops,Object ob)方法,可以附件及獲取附加信

  SelectionKey迭代器

  4.通過SelectionKey中的方法,判斷事件

  isAcceptable() 是否有準備好接收新連接

  isConnectable() 是否有完成連接狀態(tài)

  isReadable() 是否有處于可讀取狀態(tài)

  isWritable() 是否有處于可寫入狀態(tài)

  isValid() 是否是有效的鍵

  步驟

  1.獲取選擇器對象

  2.創(chuàng)建通道對象,設置異步,注冊到選擇器

  3.定義死循環(huán),重復檢查是否有新事件觸發(fā)(Selector中的int select()方法)

  3.1.如果觸發(fā)新時間,獲取所有觸發(fā)事件集(Selector的Set selectedKeys()方法)

  3.2.獲取觸發(fā)事件集合的迭代器

  3.3.遍歷迭代器,獲取所有觸發(fā)的事件

       3.3.1判斷觸發(fā)事件類型,指向相應操作 舉例 if (selectionKey.isAcceptable()) {}

       3.3.2刪除已完成操作的觸發(fā)事件 (Iterator的remove()方法)

  服務器端代碼

public class Test04ServerBySelector {
    public static void main(String[] args) throws IOException, InterruptedException {

        //獲取一個選擇器
        Selector selector = Selector.open();

        //創(chuàng)建三個服務器通道,監(jiān)聽三個端口
        ServerSocketChannel serverChannel1 = ServerSocketChannel.open();
        serverChannel1.bind(new InetSocketAddress(6666));
        serverChannel1.configureBlocking(false);
        ServerSocketChannel serverChannel2 = ServerSocketChannel.open();
        serverChannel2.bind(new InetSocketAddress(7777));
        serverChannel2.configureBlocking(false);
        ServerSocketChannel serverChannel3 = ServerSocketChannel.open();
        serverChannel3.bind(new InetSocketAddress(8888));
        serverChannel3.configureBlocking(false);

        //將三個服務器通道注冊給選擇器
        serverChannel1.register(selector, SelectionKey.OP_ACCEPT);
        serverChannel2.register(selector, SelectionKey.OP_ACCEPT);
        serverChannel3.register(selector, SelectionKey.OP_ACCEPT);

        //循環(huán)監(jiān)聽三個通道
        while (true) {
            System.out.println("--------");
            System.out.println("等待客戶端連接...");

            //獲取觸發(fā)的事件個數
            int keyCount = selector.select();//阻塞式方法
            System.out.println("有一個客戶端連接成功...");
            System.out.println("已就緒事件個數=" + keyCount);
            System.out.println("注冊通道數量=" + selector.keys().size());


            //獲取觸發(fā)事件集
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            System.out.println("觸發(fā)事件數量=" + selectionKeys.size());

            //獲取事件集迭代器
            Iterator<SelectionKey> it = selectionKeys.iterator();

            //遍歷事件集
            while (it.hasNext()) {
                //獲取注冊鍵
                SelectionKey selectionKey = it.next();

                //使用選擇器完成數據讀取
                if (selectionKey.isAcceptable()) {
                    //獲取通道對象
                    ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
                    //獲取服務器與客戶端的連接
                    SocketChannel server = channel.accept();
                    //設置非阻塞
                    server.configureBlocking(false);
                    //注冊讀取事件
                    server.register(selector, selectionKey.OP_READ);
                    //selectionKey.interestOps(selectionKey.OP_READ);
                } else if (selectionKey.isReadable()) {
                    //獲取客戶端數據
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    SocketChannel server = (SocketChannel) selectionKey.channel();
                    server.read(buffer);
                    buffer.flip();
                    String content = new String(buffer.array(), 0, buffer.limit());
                    System.out.println("客戶端發(fā)送的數據:" + content);
                    //關閉資源
                    server.close();

                }
                //刪除當前觸發(fā)事件
                it.remove();
            }
            System.out.println("休息1秒,等待下一次操作...");
            Thread.sleep(1000);
        }
    }
}

  客戶端代碼

public class Test04ClientByChannel {
    public static void main(String[] args) {
        int[] ports = {7777, 8888, 6666};
        for (int i = 0; i < ports.length; i++) {
            int port = ports[i];
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        //創(chuàng)建客戶端通道
                        SocketChannel client = SocketChannel.open();
                        //連接服務器
                        client.connect(new InetSocketAddress("localhost", port));
                        //發(fā)送數據
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        buffer.put("你好啊,哈哈哈".getBytes());
                        buffer.flip();
                        client.write(buffer);
                        //關閉資源
                        client.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
}

  異步非阻塞交互(AIO)

  介紹

  支持異步操作的NIO體系

  常見分類:

         AsynchronousSocketChannel 客戶端異步通道

         AsynchronousServerSocketChannel服務端異步通道

         AsynchronousFileChannel文件異步通道

         AsynchronousDatagramChannel 數據異步通道

  CompletionHandler回調接口

     void completed(V result,A attachment);異步操作成功被回調。

     void failed(Throwable exc,A attachment);異步操作失敗時被回調。

  AsynchronousSocketChannel常用方法

  public static AsynchronousSocketChannel open();打開異步服務器套接字通道。

  void read(ByteBuffer dst,A attachment,CompletionHandler handler) 讀取數據。

  void write(ByteBuffer src,A attachment,CompletionHandler handler) 寫出數據

  AsynchronousServerSocketChannel常用方法

  public static AsynchronousServerSocketChannel open()打開異步服務器套接字通道。

  AsynchronousServerSocketChannel bind(SocketAddress local,int backlog) ;綁定服務端IP地址,端口號

  void accept(A attachment,CompletionHandler handler) ;接收連接

  服務器端代碼

package com.NIO.src.com.itheima;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;

public class Test05ServerBySynChanner {
    //如果為true,服務器結束。
    static boolean isOver = false;

    public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {


        //獲取服務器通道
        AsynchronousServerSocketChannel serverChanner = AsynchronousServerSocketChannel.open();
        //綁定端口號
        serverChanner.bind(new InetSocketAddress(8888));
        // 獲取服務器與客戶端的對接
        serverChanner.accept("accept", new CompletionHandler<AsynchronousSocketChannel, String>() {
            @Override
            public void completed(AsynchronousSocketChannel result, String attachment) {
                try {
                    isOver=true;
                    System.out.println("接受了一個連接:" + result.getLocalAddress()
                            .toString());
                    // 給客戶端發(fā)送數據并等待發(fā)送完成
                    result.write(ByteBuffer.wrap("From Server:我是服務器".getBytes()))
                            .get();
                    ByteBuffer readBuffer = ByteBuffer.allocate(128);
                    // 阻塞等待客戶端接收數據
                    result.read(readBuffer).get();
                    System.out.println(new String(readBuffer.array()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void failed(Throwable exc, String attachment) {
                isOver=true;
                System.out.println("連接失敗");
            }
        });

        //由于異步執(zhí)行,所以上述操作不會阻礙當前循環(huán)的執(zhí)行。
        while (true) {
            if (isOver) {
                break;
            }
            System.out.println("服務端:先干為敬");
        }

    }
}

  客戶端代碼

public class Test05ClientBySynChannel {
    public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {

        //創(chuàng)建客戶端通道對象
        AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
        //與服務器進行連接
        client.connect(new InetSocketAddress("localhost", 8888), "connect", new CompletionHandler<Void, String>() {
            @Override
            public void completed(Void result, String attachment) {
                System.out.println("連接到服務器成功!");
                try {
                    // 給服務器發(fā)送信息并等待發(fā)送完成
                    client.write(ByteBuffer.wrap("From client:我是服務器".getBytes())).get();
                    ByteBuffer readBuffer = ByteBuffer.allocate(128);
                    // 阻塞等待接收服務端數據
                    client.read(readBuffer).get();
                    System.out.println(new String(readBuffer.array()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void failed(Throwable exc, String attachment) {
                System.out.println("連接到服務器失敗");
            }
        });
    }
}
分享到:
在線咨詢 我要報名
和我們在線交談!