자바 NIO 기초: 동기 비동기 I/O 모델

  1. NIO 소개 NIO(New I/O)는 Java 1.4부터 도입된 동기 비동기 I/O 모델로, 기존 자바 IO와 네트워킹 API를 대체하기 위해 설계되었습니다. NIO의 핵심 특징은 단일 스레드가 여러 클라이언트의 입출력 작업을 동시에 처리할 수 있다는 점입니다.

NIO에서는 스레드가 선택기(Selector)를 관리하며, 클라이언트의 읽기/쓰기 작업이 바로 스레드에 직접 전달되지 않고 버퍼를 거칩니다. 채널(Channel)은 버퍼에서 해당 이벤트를 감지하고, 준비된 읽기/쓰기 이벤트가 발생하면 선택기에 알립니다. 이렇게 준비된 채널만 데이터를 읽고 쓰며, 현재 채널에 입출력 이벤트가 없을 경우 다른 준비된 채널을 처리함으로써 단일 스레드로 여러 클라이언트를 효율적으로 관리할 수 있습니다.

  1. 세 가지 핵심 구성 요소 NIO의 세 가지 핵심 구성 요소는 버퍼(Buffer), 채널(Channel), 선택기(Selector)입니다.

2.1 버퍼(Buffer) 버퍼는 데이터를 쓸 수 있는 메모리 블록(배열과 유사)이며, NIO Buffer 객체에 캡슐화되어 제공됩니다. 이 객체는 메모리 블록을 더 쉽게 조작하고 관리할 수 있는 메서드 세트를 제공합니다.

2.1.1 버퍼 읽기/쓰기 단계 첫째: 데이터를 버퍼에 쓰기 둘째: buffer()를 호출하여 읽기 모드로 전환 셋째: 버퍼에서 데이터 읽기 넷째: buffer.clear() 또는 buffer.compact()로 버퍼 초기화

2.1.2 버퍼 작동 원리 세 가지 중요한 속성:

  • 용량(capacity): 메모리 블록의 고정된 크기
  • 위치(position): 쓰기 모드에서는 데이터 쓰기 위치, 읽기 모드에서는 데이터 읽기 위치
  • 한계(limit): 쓰기 모드에서는 용량과 동일, 읽기 모드에서는 쓰여진 데이터의 양

예제 코드 2.1.2.1:

package com.java.nio.example;

import java.nio.ByteBuffer;

public class BufferExample {
    public static void main(String[] args) {
        // 용량이 4인 바이트 버퍼 생성
        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(4);
        
        // 초기 상태 출력
        System.out.println(String.format("초기 상태: 용량: %d, 위치: %d, 한계: %d",
                byteBuffer.capacity(), byteBuffer.position(), byteBuffer.limit()));
        
        // 3바이트 데이터 쓰기
        byteBuffer.put((byte) 10);
        byteBuffer.put((byte) 20);
        byteBuffer.put((byte) 30);
        
        // 데이터 쓰기 후 상태 출력
        System.out.println(String.format("데이터 쓰기 후: 용량: %d, 위치: %d, 한계: %d",
                byteBuffer.capacity(), byteBuffer.position(), byteBuffer.limit()));

        // 읽기 모드로 전환
        System.out.println("++++++++++++++++++++++읽기 시작++++++++++++++++++++++++");
        byteBuffer.flip();
        byte a = byteBuffer.get();
        System.out.println("첫 번째 바이트: " + a);
        byte b = byteBuffer.get();
        System.out.println("두 번째 바이트: " + b);

        System.out.println(String.format("2바이트 읽기 후: 용량: %d, 위치: %d, 한계: %d",
                byteBuffer.capacity(), byteBuffer.position(), byteBuffer.limit()));

        // 현재 읽기 모드에서 추가 데이터 쓰기
        byteBuffer.compact();
        ByteBuffer t1 = byteBuffer.put((byte) 30);
        System.out.println("데이터 쓰기: " + t1);
        ByteBuffer t2 = byteBuffer.put((byte) 40);
        System.out.println("데이터 쓰기: " + t2);
        ByteBuffer t3 = byteBuffer.put((byte) 50);
        System.out.println("데이터 쓰기: " + t3);

        System.out.println(String.format("최종 결과: 용량: %d, 위치: %d, 한계: %d",
                byteBuffer.capacity(), byteBuffer.position(), byteBuffer.limit()));
    }
}

2.1.3 ByteBuffer 메모리 모델 ByteBuffer는 성능에 민감한 코드를 위해 직접 메모리(direct heap 외)와 비직접 메모리(heap 내) 두 가지 구현을 제공합니다. 직접 메모리는 다음과 같이 생성합니다:

ByteBuffer directBuffer = ByteBuffer.allocateDirect(size);

장점:

  • 네트워크 IO 또는 파일 IO 시 heapBuffer보다 복사 횟수가 한 번 적음
  • GC 범위 외에서 GC 부담 감소, 자동 관리 구현

2.2 채널(Channel) BIO에서는 OutputStream과 InputStream을 통해 데이터를 읽고 썼지만, NIO에서는 채널 하나만으로 모든 작업을 처리할 수 있습니다.

채널 API는 UDP/TCP 네트워크와 파일 IO를 모두 다룹니다:

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

표준 IO Stream과의 차이점:

  • 단일 채널 내에서 읽기와 쓰기 가능
  • Stream은 단방향
  • 비차단 읽기와 쓰기 가능
  • 채널은 항상 버퍼를 읽거나 씀

2.2.1 SocketChannel 소개 SocketChannel는 TCP 네트워크 연결을 설정하며, java.net.Socket과 유사합니다. 두 가지 생성 방식:

  • 클라이언트가 서버에 연결 요청
  • 서버가 수신한 새 연결

2.2.2 ServerSocketChannel 소개 ServerSocketChannel는 새로운 TCP 연결 채널을 수신합니다. ServerSocket과 유사합니다. serverSocketChannel.accept(): 채널이 비차단 모드일 때, 대기 중인 연결이 없으면 null을 반환하므로 반환된 SocketChannel가 null인지 확인해야 합니다.

예제 코드 2.2.1 (서버):

package com.java.nio.example;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

public class NIOServer {
    public static void main(String[] args) throws Exception {
        // 서버 소켓 채널 생성
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 차단 모드에서 비차단 모드로 변경
        serverSocketChannel.configureBlocking(false);
        // 포트 바인딩
        serverSocketChannel.socket().bind(new InetSocketAddress(8080));
        System.out.println("서버 시작");
        
        while (true) {
            // TCP 연결 채널 수신
            SocketChannel socketChannel = serverSocketChannel.accept();
            if (socketChannel != null) {
                System.out.println("새 연결 수신: " + socketChannel.getRemoteAddress());
                // 비차단 모드 설정
                socketChannel.configureBlocking(false);
                
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                
                // 연결 후 데이터가 없을 때 계속 루프 실행 문제
                while (socketChannel.isOpen() && socketChannel.read(buffer) != -1) {
                    if (buffer.position() > 0) {
                        break;
                    }
                }
                
                if (buffer.position() == 0) {
                    continue;
                }
                
                buffer.flip();
                byte[] content = new byte[buffer.limit()];
                buffer.get(content);
                System.out.println(new String(content));
                System.out.println("수신 데이터: " + socketChannel.getRemoteAddress());

                String response = "HTTP/1.1 200 OK\r\n" +
                        "Content-Length: 11\r\n\r\n" +
                        "Hello World";

                ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());

                while (responseBuffer.hasRemaining()) {
                    socketChannel.write(responseBuffer);
                }
            }
        }
    }
}

문제 해결을 위한 개선 예제 2.2.2:

package com.java.nio.example;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class ImprovedNIOServer {
    // 연결된 채널 목록
    private static List<SocketChannel> clientChannels = new ArrayList<>();

    public static void main(String[] args) throws Exception {
        // 서버 소켓 채널 생성
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 차단 모드에서 비차단 모드로 변경
        serverSocketChannel.configureBlocking(false);
        // 포트 바인딩
        serverSocketChannel.socket().bind(new InetSocketAddress(8080));
        System.out.println("서버 시작");
        
        while (true) {
            // 새 연결 시도
            SocketChannel socketChannel = serverSocketChannel.accept();
            if (socketChannel != null) {
                System.out.println("새 연결 수신: " + socketChannel.getRemoteAddress());
                // 비차단 모드 설정
                socketChannel.configureBlocking(false);
                clientChannels.add(socketChannel);
            } else {
                // 기존 연결 처리
                Iterator<SocketChannel> iterator = clientChannels.iterator();
                while (iterator.hasNext()) {
                    SocketChannel channel = iterator.next();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    
                    if (channel.read(buffer) == 0) {
                        continue;
                    }
                    
                    while (channel.isOpen() && channel.read(buffer) != -1) {
                        if (buffer.position() > 0) {
                            break;
                        }
                    }
                    
                    if (buffer.position() == 0) {
                        continue;
                    }
                    
                    buffer.flip();
                    byte[] content = new byte[buffer.limit()];
                    buffer.get(content);
                    System.out.println(new String(content));
                    System.out.println("수신 데이터: " + channel.getRemoteAddress());

                    String response = "HTTP/1.1 200 OK\r\n" +
                            "Content-Length: 11\r\n\r\n" +
                            "Hello World";

                    ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());

                    while (responseBuffer.hasRemaining()) {
                        channel.write(responseBuffer);
                    }
                    
                    iterator.remove();
                }
            }
        }
    }
}

예제 코드 2.2.3 (클라이언트):

package com.java.nio.example;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class NIOClient {
    public static void main(String[] args) throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
        
        while (!socketChannel.finishConnect()) {
            Thread.yield();
        }

        Scanner scanner = new Scanner(System.in);
        System.out.println("메시지 입력:");
        String message = scanner.nextLine();
        ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
        
        while (buffer.hasRemaining()) {
            socketChannel.write(buffer);
        }
        
        // 서버 응답 수신
        System.out.println("서버 응답 수신:");
        ByteBuffer responseBuffer = ByteBuffer.allocate(1024);

        while (socketChannel.isOpen() && socketChannel.read(responseBuffer) != -1) {
            if (responseBuffer.position() > 0) {
                break;
            }
        }
        
        responseBuffer.flip();
        byte[] content = new byte[responseBuffer.limit()];
        responseBuffer.get(content);
        System.out.println(new String(content));
        
        scanner.close();
        socketChannel.close();
    }
}

2.3 선택기(Selector) Selector는 하나 이상의 NIO 채널을 검사하고 어떤 채널이 읽기 또는 쓰기 준비가 되었는지 결정하는 Java NIO 구성 요소입니다. 이를 통해 단일 스레드로 여러 채널을 관리하고 여러 네트워크 연결을 처리할 수 있습니다.

하나의 스레드가 Selector를 사용하여 여러 채널의 다양한 이벤트를 모니터링하며, 이를 이벤트 드리븐 메커니즘이라고도 합니다.

2.3.1 네 가지 이벤트 네 가지 이벤트는 각각 SelectionKey의 상수에 해당합니다:

  1. 연결 (Connect) - SelectionKey.OP_CONNECT
  2. 수신 준비 (Accept) - OP_ACCEPT
  3. 읽기 (Read) - OP_READ
  4. 쓰기 (Write) - OP_WRITE

Selector를 사용하여 위의 폴링 방식 대체 예제 2.3.1:

package com.java.nio.example;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class SelectorBasedNIOServer {
    public static void main(String[] args) throws Exception {
        // 1. 서버 소켓 채널 생성
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 차단 모드에서 비차단 모드로 변경
        serverSocketChannel.configureBlocking(false);
        
        // 2. Selector 생성 및 채널 등록
        Selector selector = Selector.open();
        SelectionKey selectionKey = serverSocketChannel.register(selector, 0, serverSocketChannel);
        selectionKey.interestOps(SelectionKey.OP_ACCEPT);
        
        // 3. 포트 바인딩
        serverSocketChannel.socket().bind(new InetSocketAddress(8080));
        System.out.println("서버 시작");
        
        while (true) {
            // 채널 폴링 대신 이벤트 폴링 사용
            selector.select(); // 이벤트가 있을 때까지 블로킹
            // 이벤트 가져오기
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            // 결과 순회
            Iterator<SelectionKey> iter = selectedKeys.iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                
                // 읽기와 수신 이벤트 처리
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.attachment();
                    // 클라이언트 연결 채널을 가져와 selector에 등록
                    SocketChannel clientSocketChannel = server.accept();
                    clientSocketChannel.configureBlocking(false);
                    clientSocketChannel.register(selector, SelectionKey.OP_READ, clientSocketChannel);
                    System.out.println("새 연결 수신: " + clientSocketChannel.getRemoteAddress());
                }
                
                if (key.isReadable()) {
                    SocketChannel socketChannel = (SocketChannel) key.attachment();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    
                    while (socketChannel.isOpen() && socketChannel.read(buffer) != -1) {
                        if (buffer.position() > 0) {
                            break;
                        }
                    }
                    
                    if (buffer.position() == 0) {
                        continue;
                    }
                    
                    buffer.flip();
                    byte[] content = new byte[buffer.limit()];
                    buffer.get(content);
                    System.out.println(new String(content));
                    System.out.println("수신 데이터: " + socketChannel.getRemoteAddress());

                    String response = "HTTP/1.1 200 OK\r\n" +
                            "Content-Length: 11\r\n\r\n" +
                            "Hello World";

                    ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());

                    while (responseBuffer.hasRemaining()) {
                        socketChannel.write(responseBuffer);
                    }
                    
                    key.cancel();
                }
            }
            selector.selectNow();
        }
    }
}
  1. NIO와 BIO의 차이점 BIO(블로킹 I/O)는 각 연결마다 전용 스레드가 필요하므로 많은 수의 연결을 처리하기에는 비효율적입니다. 반면 NIO는 단일 스레드로 여러 연결을 처리할 수 있어 자원 효율성이 높습니다.

  2. Reactor 모델 - NIO와 멀티스레드 결합 개선 방안 Reactor 모델의 핵심 아이디어는 네트워크 연결 처리를 특정 스레드 그룹에 맡기고, IO 읽기/쓰기 작업을 다른 스레드 그룹에 맡기는 것입니다.

예제 코드 4.1:

package com.java.nio.example;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class ReactorModelNIOServer {
    // 비즈니스 작업 처리를 위한 스레드 풀
    private static ExecutorService workPool = Executors.newCachedThreadPool();

    // select() 등 이벤트 폴링 코드를 캡슐화한 추상 클래스
    abstract class ReactorThread extends Thread {
        Selector selector;
        LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();

        // selector가 이벤트를 감지하면 호출
        public abstract void handler(SelectableChannel channel) throws Exception;

        private ReactorThread() throws IOException {
            selector = Selector.open();
        }

        volatile boolean running = false;

        @Override
        public void run() {
            // selector 이벤트 폴링
            while (running) {
                Runnable task;
                while ((task = taskQueue.poll()) != null) {
                    task.run();
                }
                
                try {
                    selector.select(1000);
                    
                    // 선택된 키 가져오기
                    Set<SelectionKey> selected = selector.selectedKeys();
                    // 결과 순회
                    Iterator<SelectionKey> iter = selected.iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
                        int readyOps = key.readyOps();
                        
                        // 읽기와 수신 이벤트 처리
                        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                            try {
                                SelectableChannel channel = (SelectableChannel) key.attachment();
                                channel.configureBlocking(false);
                                handler(channel);
                                if (!channel.isOpen()) {
                                    key.cancel();
                                }
                            } catch (Exception ex) {
                                key.cancel();
                            }
                        }
                    }
                    selector.selectNow();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        private SelectionKey register(SelectableChannel channel) throws Exception {
            FutureTask<SelectionKey> futureTask = new FutureTask<>(() -> channel.register(selector, 0, channel));
            taskQueue.add(futureTask);
            return futureTask.get();
        }

        private void doStart() {
            if (!running) {
                running = true;
                start();
            }
        }
    }

    private ServerSocketChannel serverSocketChannel;
    // 1. 수신 처리 reactor 스레드 (수신 스레드)
    private ReactorThread[] mainReactorThreads = new ReactorThread[1];
    // 2. IO 처리 reactor 스레드 (IO 스레드)
    private ReactorThread[] subReactorThreads = new ReactorThread[8];

    /**
     * 스레드 그룹 초기화
     */
    private void newGroup() throws IOException {
        // IO 스레드 생성 - 클라이언트 연결 후 socketChannel의 IO 읽기/쓰기 처리
        for (int i = 0; i < subReactorThreads.length; i++) {
            subReactorThreads[i] = new ReactorThread() {
                @Override
                public void handler(SelectableChannel channel) throws IOException {
                    // IO 스레드는 IO 처리만 담당, 수신 이벤트 처리 안 함
                    SocketChannel ch = (SocketChannel) channel;
                    ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
                    
                    while (ch.isOpen() && ch.read(requestBuffer) != -1) {
                        // 연결 유지 시 데이터 읽기 종료 여부 판단 (간단히 0바이트 초과 시 요청 종료)
                        if (requestBuffer.position() > 0) break;
                    }
                    
                    if (requestBuffer.position() == 0) return; // 데이터 없음
                    
                    requestBuffer.flip();
                    byte[] content = new byte[requestBuffer.limit()];
                    requestBuffer.get(content);
                    System.out.println(new String(content));
                    System.out.println(Thread.currentThread().getName() + " 수신 데이터: " + ch.getRemoteAddress());

                    // TODO 비즈니스 작업 (DB, API 호출 등...)
                    workPool.submit(() -> {
                        // 비즈니스 로직 처리
                    });

                    // 응답 결과 200
                    String response = "HTTP/1.1 200 OK\r\n" +
                            "Content-Length: 11\r\n\r\n" +
                            "Hello World";
                    ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
                    
                    while (buffer.hasRemaining()) {
                        ch.write(buffer);
                    }
                }
            };
        }

        // mainReactor 스레드 생성 - serverSocketChannel만 처리
        for (int i = 0; i < mainReactorThreads.length; i++) {
            mainReactorThreads[i] = new ReactorThread() {
                AtomicInteger counter = new AtomicInteger(0);

                @Override
                public void handler(SelectableChannel channel) throws Exception {
                    // 요청 분기만 담당, 실제 데이터 읽기는 하지 않음
                    ServerSocketChannel ch = (ServerSocketChannel) channel;
                    SocketChannel socketChannel = ch.accept();
                    socketChannel.configureBlocking(false);
                    
                    // 연결 수신 후 IO 스레드에 데이터 읽기 계속 전달
                    int index = counter.getAndIncrement() % subReactorThreads.length;
                    ReactorThread workEventLoop = subReactorThreads[index];
                    workEventLoop.doStart();
                    SelectionKey selectionKey = workEventLoop.register(socketChannel);
                    selectionKey.interestOps(SelectionKey.OP_READ);
                    
                    System.out.println(Thread.currentThread().getName() + " 새 연결 수신: " + socketChannel.getRemoteAddress());
                }
            };
        }
    }

    /**
     * 채널 초기화 및 eventLoop 스레드 바인딩
     */
    private void initAndRegister() throws Exception {
        // ServerSocketChannel 생성
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        
        // serverSocketChannel을 selector에 등록
        int index = new Random().nextInt(mainReactorThreads.length);
        mainReactorThreads[index].doStart();
        SelectionKey selectionKey = mainReactorThreads[index].register(serverSocketChannel);
        selectionKey.interestOps(SelectionKey.OP_ACCEPT);
    }

    /**
     * 포트 바인딩
     */
    private void bind() throws IOException {
        // 포트 바인딩 및 서비스 시작
        serverSocketChannel.bind(new InetSocketAddress(8080));
        System.out.println("서버 시작 완료, 포트 8080");
    }

    public static void main(String[] args) throws Exception {
        ReactorModelNIOServer nioServer = new ReactorModelNIOServer();
        nioServer.newGroup(); // 1. main과 sub 두 그룹의 스레드 생성
        nioServer.initAndRegister(); // 2. serverSocketChannel 생성 및 mainReactor 스레드의 selector에 등록
        nioServer.bind(); // 3. serverSocketChannel에 포트 바인딩
    }
}

위 예제에서 볼 수 있듯이, 연결 생성 스레드와 데이터 처리 스레드가 다릅니다.

  1. NIO 적용 시나리오 NIO는 다음과 같은 시나리오에 적합합니다:
  2. 대량의 동시 연결 처리
  3. 대용량 파일 전송
  4. 네트워크 IO 처리

태그: Java NIO Buffer Channel Selector Reactor Pattern Non-blocking I/O

6월 15일 17:34에 게시됨