파이썬 네트워크 프로그래밍과 멀티프로세스/멀티스레드 개발

네트워크 프로그래밍

OSI 7계층 모델 웹사이트에 요청을 보내는 예를 통해 각 계층의 역할을 설명합니다:

  • 응용 계층: 데이터 형식을 정의합니다.
"GET /s?wd=안녕 HTTP/1.1\r\nHost:www.naver.com\r\n\r\n"
  • 표시 계층: 애플리케이션 데이터의 인코딩, 압축/해압축, 분할, 암호화/복호화를 담당합니다.
"GET /s?wd=안녕 HTTP/1.1\r\nHost:www.naver.com\r\n\r\n안녕".encode('utf-8')
  • 세션 계층: 연결 설정 및 종료를 처리합니다.
# 연결 요청 후 데이터 전송, 전송 완료 후 연결 종료
  • 전송 계층: 포트 간 통신을 설정합니다.
데이터: "GET /s?wd=안녕 HTTP/1.1\r\nHost:www.naver.com\r\n\r\n안녕".encode('utf-8')
포트:
    - 대상: 80
    - 로컬: 6784
  • 네트워크 계층: IP 주소 정보를 지정합니다.
IP:
    - 대상IP: 110.242.68.3 (네이버)
    - 로컬IP: 192.168.10.1
  • 데이터링크 계층: MAC 주소를 설정합니다.
MAC:
    - 대상MAC: FF-FF-FF-FF-FF-FF 
    - 로컬MAC: 11-9d-d8-1a-dd-cd
  • 물리 계층: 이진 데이터를 물리 매체로 전송합니다.
# 케이블을 통해 이진 데이터 전송

각 계층은 특정 역할을 수행하며, 최종적으로 사용자에게 데이터가 전달됩니다. 발송 시 데이터는 7개의 층으로 감싸져 전달되며, 수신 시 각 층에서 해제됩니다. 실제로는 네트워크 장치에서 처리하는 부분이 많습니다.

import socket

socket_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket_client.connect(('110.242.68.3', 80)) # 서버로 데이터 패킷 전송


key = "안녕"
# 응용 계층
content = f"GET /s?wd={key} HTTP/1.1\r\nHost:www.naver.com\r\n\r\n"
# 표시 계층
content = content.encode("utf-8")

socket_client.sendall(content)
result = socket_client.recv(8196)
print(result.decode('utf-8'))

# 세션 계층 & 전송 계층
socket_client.close()

TCP와 UDP의 차이점

TCP 서버 코드

import socket
# TCP 인스턴스 생성
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 서버 IP 및 포트 바인딩
sock.bind(("127.0.0.1", 8080))
# 포트监听 시작
sock.listen(5)
# 연결 대기
while True:
    conn, addr = sock.accept()
    # 메시지 수신
    msg = conn.recv(1024)
    # 연결 종료
    conn.close()
    break
# 서버 종료
sock.close()

TCP 클라이언트 코드

import socket
# TCP 인스턴스 생성
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 서버 IP 및 포트 연결
client.connect(("127.0.0.1", 8080))
# 메시지 전송 및 수신
client.sendall(b"안녕")
client.recv(1024)
# 연결 종료
client.close()

UDP 서버 코드

import socket
# UDP 인스턴스 생성
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 서버 IP 및 포트 바인딩
sock.bind(("127.0.0.1", 5050))
while True:
    # 메시지 수신
    data, (host, port) = sock.recvfrom(1024)

UDP 클라이언트 코드

import socket
# UDP 인스턴스 생성
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 메시지 전송 및 수신
while True:
    client.sendto(b"안녕", ("127.0.0.1", 8080))
    data, (host, port) = client.recvfrom(1024)
# 연결 종료
client.close()

TCP와 UDP의 차이점

  1. 연결 생성 과정이 다르며, TCP는 3차 핸드셰이크 및 4차 핸드셰이크를 거치고 UDP는 직접 전송합니다.
  2. TCP는 send/sendall 및 recv로 메시지를 처리하고, UDP는 sendto와 recvfrom을 사용합니다.
  3. TCP는 연결 후만 메시지 전송이 가능하므로 단순한 데이터 전송이고, UDP는 IP 및 포트 정보를 함께 전송해야 합니다.

TCP 3차 핸드셰이크 및 4차 핸드셰이크 다이어그램

3차 핸드셰이크
      클라이언트                                                서버

  1.  SYN-SENT    --> <seq=100><CTL=SYN>               --> SYN-RECEIVED

  2.  ESTABLISHED <-- <seq=300><ack=101><CTL=SYN,ACK>  <-- SYN-RECEIVED

  3.  ESTABLISHED --> <seq=101><ack=301><CTL=ACK>       --> ESTABLISHED
4차 핸드셰이크
       TCP A                                                TCP B

  1.  FIN-WAIT-1  --> <seq=100><ack=300><CTL=FIN,ACK>  --> CLOSE-WAIT

  2.  FIN-WAIT-2  <-- <seq=300><ack=101><CTL=ACK>      <-- CLOSE-WAIT

  3.  TIME-WAIT   <-- <seq=300><ack=101><CTL=FIN,ACK>  <-- LAST-ACK

  4.  TIME-WAIT   --> <seq=101><ack=301><CTL=ACK>      --> CLOSED

TCP 패킷 붙음 현상 데이터 전송 시 두 개의 데이터 패킷이 하나로 합쳐질 수 있습니다:

# 클라이언트(보내는 쪽)
import socket

client = socket.socket()
client.connect(('127.0.0.1', 8001))

client.sendall('alex가 먹고'.encode('utf-8'))
client.sendall('있어요'.encode('utf-8'))

client.close()


# 서버(받는 쪽)
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('127.0.0.1', 8001))
sock.listen(5)
conn, addr = sock.accept()

client_data = conn.recv(1024)
print(client_data.decode('utf-8'))

conn.close()
sock.close()

패킷 붙음 문제 해결 방법 모든 메시지 앞에 고정된 헤더(예: 4바이트 길이 정보)를 추가합니다:

import struct
# ########### 4바이트로 고정된 숫자 변환 ###########
v1 = struct.pack('i', 199)
print(v1)  # b'\xc7\x00\x00\x00'

for item in v1:
    print(item, bin(item))

# ########### 4바이트 숫자로 변환 ###########
v2 = struct.unpack('i', v1) # v1= b'\xc7\x00\x00\x00'
print(v2) # (199,)
# -------------------서버--------------------
import socket
import struct

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('127.0.0.1', 8001))
sock.listen(5)
conn, addr = sock.accept()

# 4바이트 읽기
header1 = conn.recv(4)
data_length1 = struct.unpack('i', header1)[0] # 데이터 바이트 길이 21
has_recv_len = 0
data1 = b""
while True:
    length = data_length1 - has_recv_len
    if length > 1024:
        lth = 1024
    else:
        lth = length
    chunk = conn.recv(lth) # 한번에 받지 못할 수도 있으므로 다시 읽음
    data1 += chunk
    has_recv_len += len(chunk)
    if has_recv_len == data_length1:
        break
print(data1.decode('utf-8'))

# 4바이트 읽기
header2 = conn.recv(4)
data_length2 = struct.unpack('i', header2)[0] # 데이터 바이트 길이
data2 = conn.recv(data_length2) # 길이
print(data2.decode('utf-8'))

conn.close()
sock.close()

# -------------------------클라이언트-------------------------
import socket
import struct

client = socket.socket()
client.connect(('127.0.0.1', 8001))

# 첫 번째 데이터
data1 = 'alex가 먹고'.encode('utf-8')

header1 = struct.pack('i', len(data1))

client.sendall(header1)
client.sendall(data1)

# 두 번째 데이터
data2 = '있어요'.encode('utf-8')
header2 = struct.pack('i', len(data2))
client.sendall(header2)
client.sendall(data2)

client.close()

사례: 메시지 및 파일 업로드

서버

import os
import json
import socket
import struct


def recv_data(conn, chunk_size=1024):
    # 헤더 정보 읽기: 데이터 길이
    has_read_size = 0
    bytes_list = []
    while has_read_size < 4:
        chunk = conn.recv(4 - has_read_size)
        has_read_size += len(chunk)
        bytes_list.append(chunk)
    header = b"".join(bytes_list)
    data_length = struct.unpack('i', header)[0]

    # 데이터 읽기
    data_list = []
    has_read_data_size = 0
    while has_read_data_size < data_length:
        size = chunk_size if (data_length - has_read_data_size) > chunk_size else data_length - has_read_data_size
        chunk = conn.recv(size)
        data_list.append(chunk)
        has_read_data_size += len(chunk)

    data = b"".join(data_list)

    return data


def recv_file(conn, save_file_name, chunk_size=1024):
    save_file_path = os.path.join('files', save_file_name)
    # 헤더 정보 읽기: 데이터 길이
    has_read_size = 0
    bytes_list = []
    while has_read_size < 4:
        chunk = conn.recv(4 - has_read_size)
        bytes_list.append(chunk)
        has_read_size += len(chunk)
    header = b"".join(bytes_list)
    data_length = struct.unpack('i', header)[0]

    # 데이터 읽기
    file_object = open(save_file_path, mode='wb')
    has_read_data_size = 0
    while has_read_data_size < data_length:
        size = chunk_size if (data_length - has_read_data_size) > chunk_size else data_length - has_read_data_size
        chunk = conn.recv(size)
        file_object.write(chunk)
        file_object.flush()
        has_read_data_size += len(chunk)
    file_object.close()


def run():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # IP 재사용 설정
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    sock.bind(('127.0.0.1', 8001))
    sock.listen(5)
    while True:
        conn, addr = sock.accept()

        while True:
            # 메시지 유형 읽기
            message_type = recv_data(conn).decode('utf-8')
            if message_type == 'close':  # 종료 신호
                print("연결 종료")
                break
            # 파일: {'msg_type':'file', 'file_name':"xxxx.xx" }
            # 메시지: {'msg_type':'msg'}
            message_type_info = json.loads(message_type)
            if message_type_info['msg_type'] == 'msg':
                data = recv_data(conn)
                print("수신된 메시지:", data.decode('utf-8'))
            else:
                file_name = message_type_info['file_name']
                print("수신된 파일, 저장 경로:", file_name)
                recv_file(conn, file_name)

        conn.close()
    sock.close()


if __name__ == '__main__':
    run()

클라이언트

import os
import json
import socket
import struct


def send_data(conn, content):
    data = content.encode('utf-8')
    header = struct.pack('i', len(data))
    conn.sendall(header)
    conn.sendall(data)


def send_file(conn, file_path):
    file_size = os.stat(file_path).st_size
    header = struct.pack('i', file_size)
    conn.sendall(header)

    has_send_size = 0
    file_object = open(file_path, mode='rb')
    while has_send_size < file_size:
        chunk = file_object.read(2048)
        conn.sendall(chunk)
        has_send_size += len(chunk)
    file_object.close()


def run():
    client = socket.socket()
    client.connect(('127.0.0.1', 8001))

    while True:
        """
        메시지 전송 형식:
            - 메시지: msg|안녕하세요
            - 파일: file|test.png
        """
        content = input(">>>")  # msg 또는 file
        if content.upper() == 'Q':
            send_data(client, "close")
            break
        input_text_list = content.split('|')
        if len(input_text_list) != 2:
            print("잘못된 형식입니다.")
            continue

        message_type, info = input_text_list

        # 메시지 전송
        if message_type == 'msg':

            # 메시지 유형 전송
            send_data(client, json.dumps({"msg_type": "msg"}))

            # 내용 전송
            send_data(client, info)

        # 파일 전송
        else:
            file_name = info.rsplit(os.sep, maxsplit=1)[-1]

            # 메시지 유형 전송
            send_data(client, json.dumps({"msg_type": "file", 'file_name': file_name}))

            # 내용 전송
            send_file(client, info)

    client.close()


if __name__ == '__main__':
    run()

차단 및 비차단 기본적으로 네트워크 프로그래밍은 차단 상태로 작동합니다:

# ################### 소켓 서버 ###################
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('127.0.0.1', 8001))
sock.listen(5)
# 차단
conn, addr = sock.accept()
# 차단
client_data = conn.recv(1024)
print(client_data.decode('utf-8'))
conn.close()
sock.close()

# ################### 소켓 클라이언트 ###################
import socket
client = socket.socket()

# 차단
client.connect(('127.0.0.1', 8001))
client.sendall('alex가 먹고있어요'.encode('utf-8'))
client.close()

비차단으로 변경하려면 sock.setblocking(False)를 추가하면 됩니다. 하지만 비차단 상태에서 accept, recv, connect가 실행되면 BlockingIOError가 발생합니다. 이것은 코드 오류가 아니라 차단 상태가 해제되었기 때문에 발생하는 일반적인 예외입니다. 비차단 코드는 일반적으로 IO 복수 활용과 결합하여 더 많은 성능을 얻을 수 있습니다.

IO 복수 활용 IO 복수 활용은 여러 디스크립터를 모니터링하여 어떤 디스크립터가 준비되었는지 확인하는 기법입니다. IO 복수 활용 + 비차단은 TCP 서버가 여러 클라이언트를 동시에 처리할 수 있도록 합니다:

# ################### 소켓 서버 ###################
import select
import socket

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)  # 비차단 설정
server.bind(('127.0.0.1', 8001))
server.listen(5)

inputs = [server, ] # 소켓 객체 목록 -> [server, 첫 번째 클라이언트 연결 conn ]

while True:
    # 입력 리스트에서 준비된 소켓을 가져옵니다.
    r, w, e = select.select(inputs, [], [], 0.05)
    for sock in r:
        # server
        if sock == server:
            conn, addr = sock.accept() # 새로운 연결 수신
            print("새로운 연결")
            inputs.append(conn)
        else:
            data = sock.recv(1024)
            if data:
                print("수신된 메시지:", data)
            else:
                print("연결 종료")
                inputs.remove(sock)
    # 다른 작업 수행 20초

IO 복수 활용 개념 보완 IO 복수 활용 + 비차단을 사용하면 소켓 서버 및 클라이언트 모두 성능을 향상시킬 수 있습니다. IO 복수 활용은 소켓 객체가 변화했는지 모니터링합니다. 비차단은 connect, recv 과정에서 대기하지 않습니다. 주의: IO 복수 활용은 IO 객체의 상태 변화만 모니터링하며, 파일 읽기/쓰기, 터미널 장치, 네트워크 요청 등에 적용됩니다. Linux에서는 IO 복수 활용이 세 가지 모드로 구현되어 있습니다: select, poll, epoll. (Windows는 select만 지원)

멀티프로세스 및 멀티스레드

GIL (글로벌 인터프리터 락) CPython 인터프리터에서 존재하는 락으로, 프로세스 내에서 한 번에 하나의 스레드만 CPU에 의해 실행될 수 있게 합니다.

멀티프로세스의 생성 방식 및 차이점 프로세스 간은 서로 독립적입니다. Python에서는 멀티프로세스를 통해 CPU의 다중 커널을 활용할 수 있으며, 계산 집약형 작업에는 적합합니다. Python에서 multiprocessing 모듈을 사용한 프로세스 조작:

import multiprocessing
import time

def task():
    print(name)
    name.append(123)


if __name__ == '__main__':
    multiprocessing.set_start_method("fork")  # fork, spawn, forkserver
    name = []
    p1 = multiprocessing.Process(target=task)
    p1.start()

    time.sleep(2)
    print(name)  # []

멀티프로세스 간 데이터 공유 및 교환 프로세스 간은 서로 독립적이므로 데이터 공유 및 교환을 위해 Python은 다음과 같은 4가지 방법을 제공합니다:

  1. Value 및 Array 방법: 드물게 사용됩니다. Value 및 Array는 하위 C 언어로 구현되어 데이터 타입이 정의되면 수정 불가능합니다. Array는 배열이며, 두 가지 제약이 있습니다:
  2. 타입 정의 후 수정 불가능
  3. 배열 길이 정의 후 수정 불가능

관리자 방법: 일반적으로 사용됩니다. Manager는 리스트 및 딕셔너리 데이터 타입을 정의할 수 있습니다.

큐: 일반적으로 사용됩니다.

import multiprocessing

def task(q):
    for i in range(10):
        q.put(i)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    
    p = multiprocessing.Process(target=task, args=(queue,))
    p.start()
    p.join()

    print("메인 프로세스")
    print(queue.get())
    print(queue.get())
    print(queue.get())
    print(queue.get())
    print(queue.get())

파이프: 큐는 A→B 방향의 단방향 전송이고, 파이프는 양방향 전송입니다.

import time
import multiprocessing


def task(conn):
    time.sleep(1)
    conn.send([111, 22, 33, 44])
    data = conn.recv() # 차단
    print("서브프로세스 수신:", data)
    time.sleep(2)


if __name__ == '__main__':
    parent_conn, child_conn = multiprocessing.Pipe()

    p = multiprocessing.Process(target=task, args=(child_conn,))
    p.start()

    info = parent_conn.recv() # 차단
    print("메인 프로세스 수신:", info)
    parent_conn.send(666)

Python 공식 외에도 MySQL, Redis 등의 데이터 공유 메커니즘도 제공됩니다.

멀티프로세스 및 멀티스레드 호출, 차단, 주 프로세스(스레드) 이름 설정

  1. 두 가지 모두 start 메서드로 시작되고 join 메서드로 차단됩니다.
  2. 멀티프로세스 및 멀티스레드 모두 주 프로세스(스레드)를 보호하는 방법이 있으며, 기본값은 false입니다.
  3. 멀티프로세스 및 멀티스레드의 주 프로세스(스레드) 이름 설정 및 획득 방법.

자정 멀티프로세스(스레드) 클래스

# ----------------------------------------멀티프로세스----------------------------------
import threading


class MyThread(threading.Thread):
    def run(self):
        print('실행 중', self._args) # self._args는 args로 전달된 값


t = MyThread(args=(100,))
t.start()

멀티프로세스(스레드)의 프로세스(스레드)풀 및 잠금 및 스레드 안전성 프로세스 내에서 여러 스레드가 있을 수 있으며, 여러 스레드가 동일한 "것"을 처리할 때 데이터 혼란이 발생할 수 있습니다. 멀티프로세스는 프로세스 간 데이터가 격리되지만, 데이터 공유를 통해 데이터 혼란이 발생할 수 있습니다. 따라서 잠금을 사용하여 데이터 안정성을 보장해야 합니다.

잠금

  1. 동기화 잠금 threading.Lock() multiprocessing.Lock()
  2. 재귀 잠금 threading.RLock() multiprocessing.RLock()
  3. 죽음락: 자원 경쟁 또는 상호 통신으로 인한 차단 현상.

멀티프로세스(스레드) 잠금 후 코드 예시

# ----------------------------------------멀티프로세스spawn 모드 예시1----------------------------------
import time
import multiprocessing
import os


def task(lock):
    print("시작")
    lock.acquire()
    # 파일에서 저장된 값을 읽습니다: 10
    with open('f1.txt', mode='r', encoding='utf-8') as f:
        current_num = int(f.read())

    print(os.getpid(), "줄을 줍다")
    time.sleep(0.5)
    current_num -= 1

    with open('f1.txt', mode='w', encoding='utf-8') as f:
        f.write(str(current_num))
    lock.release()


if __name__ == '__main__':
    multiprocessing.set_start_method("spawn")
    lock = multiprocessing.RLock()

    process_list = []
    for i in range(10):
        p = multiprocessing.Process(target=task, args=(lock,))
        p.start()
        process_list.append(p)

    for item in process_list:
        item.join()

멀티프로세스(스레드) 풀 및 잠금 및 스레드 안전성

# --------------------멀티스레드 예시1----------------------------------
import threading

lock_object = threading.RLock()

loop = 10000000
number = 0


def _add(count):
    lock_object.acquire() # 잠금
    global number
    for i in range(count):
        number += 1
    lock_object.release() # 잠금 해제


def _sub(count):
    lock_object.acquire() # 잠금
    global number
    for i in range(count):
        number -= 1
    lock_object.release() # 잠금 해제


t1 = threading.Thread(target=_add, args=(loop,))
t2 = threading.Thread(target=_sub, args=(loop,))
t1.start()
t2.start()

t1.join()  # t1 스레드 실행 완료 후 다음으로 진행
t2.join()  # t2 스레드 실행 완료 후 다음으로 진행

print(number)

태그: python Socket Programming TCP/IP multiprocessing Multithreading

7월 4일 22:59에 게시됨