I/O 멀티플렉싱은 하나의 프로세스가 여러 파일 디스크립터(네트워크 소켓, 파일 등)의 상태를 동시에 감시할 수 있게 해주는 기술입니다. 커널이 감시 중인 디스크립터의 상태 변화를 감지하면 플래그를 변경하고, 프로세스는 이를 감지하여 읽기 또는 쓰기 작업을 수행합니다.
트리거 방식
레벨 트리거 (Level Triggered)
준비 상태가 된 파일 디스크립터를 프로세스에 알린 후, 프로세스가 해당 디스크립터에 대해 I/O 작업을 수행하지 않으면 다음 select() 또는 poll() 호출 시 다시 보고됩니다. 이 방식은 준비된 메시지를 놓치지 않지만 시스템 자원 소모가 증가할 수 있습니다.
에지 트리거 (Edge Triggered)
파일 디스크립터가 막 준비 상태로 변경된 경우에만 한 번 알립니다. 프로세스가 즉시 조치를 취하지 않으면 다시 알리지 않습니다. 이론적으로 성능이 더 우수하지만 구현이 상대적으로 복잡합니다.
블로킹과 논블로킹
- 블로킹 모드: 데이터가 도착할 때까지 대기합니다.
- 논블로킹 모드: 데이터가 있으면 반환하고, 없으면 즉시 에러를 반환합니다.
동기와 비동기 I/O
- 동기 I/O: 요청 후 응답을 기다립니다. 블로킹이면 데이터가 올 때까지 대기하고, 논블로킹이면 즉시 결과를 반환합니다.
- 비동기 I/O: 요청 후 다른 작업을 수행하다가 처리가 완료되면 알림을 받습니다.
select, poll, epoll 비교
select
1983년 4.2BSD에서 처음 등장했습니다. select() 시스템 콜을 통해 파일 디스크립터 배열을 감시하고, 준비된 디스크립터의 플래그를 커널이 수정합니다. 가장 큰 장점은 거의 모든 플랫폼에서 지원된다는 점입니다.
단점:
- 단일 프로세스가 감시할 수 있는 파일 디스크립터 수에 제한이 있습니다. 리눅스의 경우 기본값이 1024입니다.
- 파일 디스크립터 수가 증가하면 데이터 복사 비용이 선형적으로 증가합니다.
- 모든 소켓을 선형 스캔하므로 비활성 연결이 많은 경우 오버헤드가 발생합니다.
poll
1986년 System V Release 3에서 등장했습니다. select와 유사하지만 파일 디스크립터 수에 제한이 없습니다. 그러나 사용자 공간과 커널 공간 사이에 전체 배열이 복사되는 비용은 여전히 선형적으로 증가합니다. select와 마찬가지로 레벨 트리거 방식만 지원합니다.
epoll
리눅스 2.6에서 도입된 개선된 메커니즘입니다.
- 레벨 트리거와 에지 트리거를 모두 지원합니다.
- 준비된 파일 디스크립터만 알려줍니다.
- 메모리 매핑(mmap) 기술을 사용하여 파일 디스크립터 복사 비용을 제거합니다.
- 이벤트 기반 알림 방식을 사용합니다. epoll_ctl()로 등록한 디스크립터가 준비되면 콜백 메커니즘으로 활성화됩니다.
Python select 예제
select.select(rlist, wlist, xlist[, timeout]) 함수는 읽기, 쓰기, 예외 감시를 위한 세 개의 리스트와 선택적 타임아웃 값을 받습니다. 세 개의 튜플을 반환하며, 각 튜플은 준비된 객체 리스트입니다.
서버 코드
import select
import socket
import queue
# TCP 소켓 생성 및 설정
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_address = ('0.0.0.0', 10001)
server.bind(server_address)
server.listen(10)
# 감시 리스트 초기화
inputs = [server]
outputs = []
message_queues = {}
timeout = 20
while inputs:
print("이벤트 대기 중...")
readable, writable, exceptional = select.select(inputs, outputs, inputs, timeout)
# 타임아웃 발생 시
if not (readable or writable or exceptional):
print("타임아웃 발생!")
break
# 읽기 가능 소켓 처리
for sock in readable:
if sock is server:
# 새 연결 수락
connection, client_address = sock.accept()
print(f" 연결 수락: {client_address}")
connection.setblocking(False)
inputs.append(connection)
message_queues[connection] = queue.Queue()
else:
data = sock.recv(1024)
if data:
print(f" 수신 데이터: {data} from {sock.getpeername()}")
message_queues[sock].put(data)
if sock not in outputs:
outputs.append(sock)
else:
# 연결 종료 처리
print(f" 연결 종료: {sock.getpeername()}")
if sock in outputs:
outputs.remove(sock)
inputs.remove(sock)
sock.close()
del message_queues[sock]
# 쓰기 가능 소켓 처리
for sock in writable:
try:
next_msg = message_queues[sock].get_nowait()
except queue.Empty:
print(f" {sock.getpeername()} 큐 비어있음")
outputs.remove(sock)
else:
print(f" 전송: {next_msg} to {sock.getpeername()}")
sock.send(next_msg)
# 예외 소켓 처리
for sock in exceptional:
print(f" 예외 발생: {sock.getpeername()}")
inputs.remove(sock)
if sock in outputs:
outputs.remove(sock)
sock.close()
del message_queues[sock]
클라이언트 코드
import socket
messages = ["첫 번째 메시지", "두 번째 메시지", "세 번째 메시지"]
server_address = ('127.0.0.1', 10001)
# 여러 소켓 생성 및 연결
socks = []
for i in range(10):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(server_address)
socks.append(sock)
counter = 0
for message in messages:
for sock in socks:
counter += 1
msg = f"{message} 버전 {counter}"
print(f" {sock.getpeername()} 전송: {msg}")
sock.send(msg.encode())
for sock in socks:
data = sock.recv(1024)
print(f" {sock.getpeername()} 수신: {data.decode()}")
if not data:
print(f"소켓 종료: {sock.getpeername()}")
sock.close()
Python poll 예제
poll은 select와 유사하지만 파일 디스크립터 수 제한이 없고 밀리초 단위 타임아웃을 지원합니다.
import socket
import select
import queue
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_address = ('0.0.0.0', 10001)
server.bind(server_address)
server.listen(5)
message_queues = {}
timeout = 1000 # 밀리초
# 이벤트 마스크 정의
READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR
READ_WRITE = READ_ONLY | select.POLLOUT
# poller 설정
poller = select.poll()
poller.register(server, READ_ONLY)
fd_to_socket = {server.fileno(): server}
while True:
print("이벤트 대기 중...")
events = poller.poll(timeout)
print(f"발생 이벤트 수: {len(events)}")
for fd, flag in events:
sock = fd_to_socket[fd]
if flag & (select.POLLIN | select.POLLPRI):
if sock is server:
connection, client_address = sock.accept()
print(f" 연결 수락: {client_address}")
connection.setblocking(False)
fd_to_socket[connection.fileno()] = connection
poller.register(connection, READ_ONLY)
message_queues[connection] = queue.Queue()
else:
data = sock.recv(1024)
if data:
print(f" 수신: {data} from {sock.getpeername()}")
message_queues[sock].put(data)
poller.modify(sock, READ_WRITE)
else:
print(f" 연결 종료: {sock.getpeername()}")
poller.unregister(sock)
sock.close()
del message_queues[sock]
elif flag & select.POLLHUP:
print(f" HUP 종료: {sock.getpeername()}")
poller.unregister(sock)
sock.close()
elif flag & select.POLLOUT:
try:
next_msg = message_queues[sock].get_nowait()
except queue.Empty:
print(f" {sock.getpeername()} 큐 비어있음")
poller.modify(sock, READ_ONLY)
else:
print(f" 전송: {next_msg} to {sock.getpeername()}")
sock.send(next_msg if isinstance(next_msg, bytes) else next_msg.encode())
elif flag & select.POLLERR:
print(f" 오류 발생: {sock.getpeername()}")
poller.unregister(sock)
sock.close()
del message_queues[sock]