Python 동시성 및 네트워킹 프로그래밍 고급

사례 소개

온라인 쇼핑몰 회사에서 연말연시를 맞아, 고객 주문을 시뮬레이션하여 인터페이스를 통해 10만 건의 신규 주문을 생성해야 한다고 가정해 봅시다. 어떻게 처리하시겠습니까?

솔루션 탐색

  • 순차 실행 (Serial Execution): 여러 작업이 순서대로 실행되는 방식입니다. 이전 작업이 완료되기 전까지 다음 작업은 차단됩니다.
  • 동시성 (Concurrency): 여러 작업이 번갈아 가며 실행되는 방식입니다. 작업들이 동시에 실행되는 것처럼 보일 수 있으며, 실행 시간이 겹칠 수 있습니다.
  • 병렬성 (Parallelism): 여러 작업이 실제로 동시에 실행되는 방식입니다. 작업 실행 시간이 겹치지 않습니다.

Python에서의 멀티태스킹

  • 순차 실행
  • 멀티스레딩 / 스레드 풀
  • 멀티프로세싱 / 프로세스 풀
  • 코루틴

멀티스레딩

멀티스레딩은 단일 프로세스 내에서 여러 개의 독립적인 작업을 동시에 실행하는 것을 의미합니다. 각 작업을 스레드라고 하며, 스레드 간에는 해당 프로세스의 변수, 메모리 등의 자원을 공유합니다. 멀티스레딩은 단일 CPU의 처리 능력을 최대한 활용하여 프로그램의 동시성과 응답 속도를 향상시키고, 블로킹 및 대기 문제를 방지할 수 있습니다.

멀티스레딩 사용 방법

  1. 작업 함수 작성: def task_function(url): pass
  2. 스레드 객체 생성: t = Thread(target=task_function)
  3. 스레드 시작: t.start()
  4. 메인 스레드와 결합: t.join()

from threading import Thread
import requests

def fetch_url(url):
    response = requests.get(url)
    print(response.text)
    return response

# 단일 스레드 예시
thread = Thread(target=fetch_url, args=('https://postman-echo.com/get',))
thread.start()
thread.join() # 메인 스레드를 차단하여 thread 스레드가 완료될 때까지 대기

# 여러 스레드 예시
threads = []
for i in range(10):
    t = Thread(target=fetch_url, args=('https://postman-echo.com/get',))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

Thread 객체 속성 및 메서드

초기화 매개변수

  • group: 스레드 그룹 (현재 기능 없음).
  • target: 대상 함수 (필수).
  • name: 스레드 이름.
  • args: 대상 함수의 위치 인수 (튜플 타입).
  • kwargs: 대상 함수의 키워드 인수 (딕셔너리 타입).
  • daemon: 데몬 스레드로 설정할지 여부 (메인 스레드가 종료되면 데몬 스레드는 즉시 종료됨, 불리언 타입).

스레드 관련 메서드

  • start(): 자신(run 메서드)을 실행할 별도의 스레드를 시작합니다.
  • run(): target, argskwargs에 따라 대상 함수를 호출합니다.
  • join(): 현재(메인) 스레드를 차단하고, 자신(스레드)이 종료될 때까지 기다립니다.
  • is_alive(): 스레드가 실행 중인지 여부를 반환합니다.

스레드 상태

  • New: 스레드 객체가 생성될 때 스레드는 New 상태입니다.
  • Ready: 스레드가 start() 메서드를 호출한 후, CPU 스케줄링을 기다리는 Ready 상태가 됩니다.
  • Running: CPU가 스레드를 스케줄링하여 실행할 때 Running 상태가 됩니다.
  • Blocked: 스레드가 I/O 작업과 같은 특정 이벤트가 완료되기를 기다릴 때 Blocked 상태가 됩니다.
  • Dead: 스레드가 실행을 완료하거나 예외가 발생하면 Dead 상태가 됩니다.

스레드 상태 확인

is_alive() 메서드를 사용하여 스레드 상태를 확인할 수 있습니다. 스레드가 Ready, Running 또는 Blocked 상태이면 is_alive() 메서드는 True를 반환하고, 그렇지 않으면 False를 반환합니다.


import time
from threading import Thread
import requests

def fetch_url(url):
    response = requests.get(url)
    print(response.text)
    return response

thread = Thread(target=fetch_url, args=('https://postman-echo.com/get',))
print('스레드 시작 전:', thread.is_alive())

thread.start()
print('스레드 시작 후:', thread.is_alive())

thread.join()
print('스레드 실행 완료 후:', thread.is_alive())

스레드 실행 결과 가져오기

스레드 실행 결과를 얻으려면 사용자 정의 스레드 클래스를 만들거나, 스레드 풀을 사용하는 것이 좋습니다.


from threading import Thread

class ResultThread(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, *, daemon=None):
        super().__init__(group, target, name, args, kwargs, daemon=daemon)
        self.result = None # 결과를 저장할 속성 추가

    def run(self):
        try:
            if self._target:
                self.result = self._target(*self._args, **self._kwargs) # 함수 호출 결과 저장
        finally:
            # Target, args, kwargs 참조 제거 (메모리 누수 방지)
            del self._target, self._args, self._kwargs

def fetch_url(url):
    # 실제 요청 로직
    pass

threads = [ResultThread(target=fetch_url, args=('...',)) for i in range(10)]
[t.start() for t in threads]
[t.join() for t in threads]
results = [t.result for t in threads] # 모든 스레드 결과 수집

스레드 풀 (ThreadPoolExecutor)

스레드 풀을 사용하면 미리 정해진 개수의 스레드를 생성해두고, 작업을 해당 스레드에 할당하여 실행합니다. 이를 통해 스레드 생성 및 소멸에 드는 오버헤드를 줄이고 프로그램 응답 속도를 향상시킬 수 있습니다.


from concurrent.futures import ThreadPoolExecutor, as_completed
import requests

def fetch_url(url):
    response = requests.get(url)
    print(response.text)
    return response

# 최대 5개의 워커 스레드를 가진 스레드 풀 생성
with ThreadPoolExecutor(max_workers=5) as executor:
    futures = []
    for i in range(10):
        # 작업을 스레드 풀에 제출
        future = executor.submit(fetch_url, 'https://postman-echo.com/get')
        futures.append(future)

    # 작업 완료 순서대로 결과 가져오기
    for future in as_completed(futures):
        try:
            result = future.result()
            # print(f"작업 완료: {result}")
        except Exception as exc:
            print(f'작업 중 예외 발생: {exc}')

# ThreadPoolExecutor는 `with` 블록을 벗어날 때 자동으로 shutdown()을 호출합니다.

스레드 잠금 (Lock)

여러 스레드가 공유 자원에 동시에 접근하여 수정할 때 데이터가 덮어쓰이거나 손실될 수 있습니다. 이를 방지하기 위해 스레드 잠금(Lock) 메커니즘을 사용합니다.

잠금 미사용 시


from threading import Thread

class Counter:
    def __init__(self):
        self.value = 0

    def increment(self):
        # 원자적이지 않은 연산: 읽기 -> 증가 -> 쓰기
        self.value += 1

def worker(counter):
    for _ in range(100000):
        counter.increment()

counter_obj = Counter()
threads = []
for _ in range(10):
    t = Thread(target=worker, args=(counter_obj,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

# 예상 값: 1,000,000 (10개 스레드 * 100,000번 증가)
# 실제 값은 레이스 컨디션으로 인해 이보다 작을 수 있습니다.
print(f"잠금 미사용 시 최종 값: {counter_obj.value}")

잠금 사용 시


from threading import Thread, Lock

class SafeCounter:
    def __init__(self):
        self.value = 0
        self.lock = Lock() # 동기화 잠금 객체 생성

    def increment(self):
        self.lock.acquire() # 잠금 획득 시도
        try:
            # 임계 영역: 공유 자원에 대한 접근
            self.value += 1
        finally:
            self.lock.release() # 잠금 해제 (에러 발생 여부와 관계없이 항상 해제)

def worker(counter):
    for _ in range(100000):
        counter.increment()

safe_counter_obj = SafeCounter()
threads = []
for _ in range(10):
    t = Thread(target=worker, args=(safe_counter_obj,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

# 잠금을 사용하면 예상 값인 1,000,000이 출력됩니다.
print(f"잠금 사용 시 최종 값: {safe_counter_obj.value}")

주요 스레드 잠금 종류

  • Lock: 가장 기본적인 동기화 잠금. 한 번에 하나의 스레드만 임계 영역에 접근하도록 합니다.
  • RLock (Reentrant Lock): 재진입 가능 잠금. 동일한 스레드가 여러 번 잠금을 획득할 수 있습니다. 데드락을 방지하는 데 유용합니다.
  • Semaphore: 세마포어. 동시에 특정 개수만큼의 스레드만 임계 영역에 접근하도록 제어합니다.
  • Event: 이벤트. 스레드 간의 통신 및 동기화에 사용됩니다. 특정 이벤트가 발생했음을 알리고, 다른 스레드는 해당 이벤트를 기다리게 할 수 있습니다.
  • Condition: 조건 변수. 스레드 간의 복잡한 조건 동기화에 사용됩니다. 특정 조건이 만족될 때까지 스레드를 대기시키고, 조건이 만족되면 알림(notify)을 통해 대기 중인 스레드를 깨울 수 있습니다.

재진입 잠금 (RLock)

동일한 스레드 내에서 여러 함수를 호출하며 공유 자원을 수정할 때, RLock을 사용하면 중첩된 잠금 획득으로 인한 데드락을 방지할 수 있습니다.


from threading import Thread, RLock

class RecursiveCounter:
    def __init__(self):
        self.value = 0
        self.lock = RLock() # 재진입 가능 잠금 객체 생성

    def increment(self):
        with self.lock: # 'with' 문 사용 시 자동으로 acquire/release 처리
            self.value += 1

    def decrement(self):
        with self.lock:
            self.value -= 1

def worker(counter):
    for _ in range(100000):
        counter.increment()
        counter.decrement() # 동일 스레드 내에서 다른 메서드 호출

counter_obj = RecursiveCounter()
threads = []
for _ in range(10):
    t = Thread(target=worker, args=(counter_obj,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

# 각 스레드 내에서 increment와 decrement가 상쇄되므로 최종 값은 0이어야 합니다.
print(f"RLock 사용 시 최종 값: {counter_obj.value}")

세마포어 (Semaphore)

세마포어는 동시에 실행될 수 있는 스레드의 수를 제한하는 데 주로 사용됩니다.


from threading import Thread, Semaphore
import requests

def fetch_url(url, semaphore):
    semaphore.acquire() # 세마포어 획득 (사용 가능한 슬롯 감소)
    try:
        response = requests.get(url)
        print(f"요청 성공: {url}")
        # print(response.text) # 응답 내용 출력 (선택 사항)
    finally:
        semaphore.release() # 세마포어 해제 (사용 가능한 슬롯 증가)
    return response

# 최대 5개의 동시 요청을 허용하는 세마포어 생성
semaphore = Semaphore(5)
threads = []
for i in range(10):
    t = Thread(target=fetch_url, args=('https://postman-echo.com/get', semaphore))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

이벤트 (Event)

이벤트는 스레드 간의 동기화 지점을 만들거나, 특정 신호가 발생할 때까지 스레드를 대기시키는 데 사용됩니다.


import threading
import requests
import time

def fetch_url(url, event):
    print(f"작업 준비: {url}")
    event.wait()  # 이벤트가 설정될 때까지 대기
    print(f"이벤트 수신, 요청 시작: {url}")
    response = requests.get(url)
    print(response.text) # 응답 내용 출력 (선택 사항)
    return response

event = threading.Event()
threads = []
for i in range(5):
    t = threading.Thread(target=fetch_url, args=('https://postman-echo.com/get', event))
    threads.append(t)
    t.start()

print("모든 스레드 시작. 3초 후 이벤트 설정...")
time.sleep(3)
event.set()  # 이벤트 설정 (대기 중인 모든 스레드 깨움)
print("이벤트 설정 완료.")

for t in threads:
    t.join()

조건 (Condition)

조건 변수는 스레드 간의 좀 더 복잡한 조건부 동기화 및 통신에 사용됩니다. 특정 조건이 충족될 때까지 스레드를 대기시키고, 다른 스레드가 해당 조건을 만족시킬 때 알림(notify)을 보낼 수 있습니다.


from threading import Thread, Lock, Condition, current_thread
import time

class SharedCounter:
    def __init__(self):
        self._value = 0
        self._lock = Lock()
        self._condition = Condition(self._lock) # Lock 객체와 연동된 Condition 생성

    def increment(self):
        with self._lock:
            self._value += 1
            print(f"증가: {current_thread().name}, 현재 값: {self._value}")
            # 값이 변경되었으므로 대기 중인 스레드에게 알림
            self._condition.notify() # 하나만 깨우려면 notify(), 전부 깨우려면 notify_all()

    def wait_until_value_greater_than(self, target_value):
        with self._condition: # Condition 객체의 lock을 사용하여 wait() 호출
            while self._value <= target_value:
                print(f"{current_thread().name} 대기 중, 현재 값: {self._value}, 목표: {target_value}")
                self._condition.wait() # 조건이 만족될 때까지 대기 (lock을 잠시 해제)
            print(f"{current_thread().name} 대기 완료, 현재 값: {self._value}")

    def get_value(self):
        with self._lock:
            return self._value

def producer_worker(counter):
    for i in range(10):
        counter.increment()
        time.sleep(0.5)

def consumer_worker(counter, target):
    counter.wait_until_value_greater_than(target)
    print(f"{current_thread().name} 목표 값 {target} 달성!")

shared_counter = SharedCounter()

# 생산자 스레드
producer = Thread(target=producer_worker, args=(shared_counter,), name="Producer")

# 소비자 스레드 (값이 5보다 커지기를 기다림)
consumer = Thread(target=consumer_worker, args=(shared_counter, 5), name="Consumer")

producer.start()
consumer.start()

producer.join()
consumer.join()

print(f"최종 값: {shared_counter.get_value()}")

Python GIL (Global Interpreter Lock)

GIL은 Python 인터프리터의 스레드 안전성을 보장하기 위한 메커니즘으로, 한 번에 하나의 스레드만 Python 바이트코드를 실행할 수 있도록 제한합니다. GIL은 CPU 바운드 작업에서 멀티스레딩의 병렬 실행 성능을 제한하지만, I/O 바운드 작업에서는 스레드 전환을 통해 동시성을 높이는 데 기여합니다.

생산자-소비자 패턴

생산자-소비자 패턴은 여러 생산자 스레드가 데이터를 생성하여 공유 버퍼(큐)에 넣고, 여러 소비자 스레드가 이 버퍼에서 데이터를 가져와 처리하는 동시성 프로그래밍 패턴입니다. Python의 queue 모듈은 스레드 안전한 큐를 제공하여 이 패턴을 쉽게 구현할 수 있도록 돕습니다.

  • queue.Queue(): 스레드 안전한 큐를 생성합니다.
  • put(item): 큐에 항목을 추가합니다. 큐가 가득 차 있으면 새 항목을 추가할 공간이 생길 때까지 스레드를 차단합니다.
  • get(): 큐에서 항목을 가져옵니다. 큐가 비어 있으면 항목이 도착할 때까지 스레드를 차단합니다.

생산자-소비자 패턴 예제


import queue
import threading
import time

def producer(q):
    for i in range(5):
        item = f'item-{i}'
        print(f'생산 중: {item}')
        q.put(item)
        time.sleep(1) # 생산 시간 시뮬레이션

def consumer(q):
    while True:
        item = q.get() # 큐에서 항목 가져오기
        if item is None: # 종료 신호 (None)
            q.task_done() # task_done() 호출하여 큐 작업 완료 알림
            break
        print(f'소비 중: {item}')
        time.sleep(2) # 소비 시간 시뮬레이션
        q.task_done() # 큐 작업 완료 알림

if __name__ == '__main__':
    # 최대 3개의 항목을 저장할 수 있는 큐 생성
    shared_queue = queue.Queue(maxsize=3)

    producer_thread = threading.Thread(target=producer, args=(shared_queue,))
    consumer_thread = threading.Thread(target=consumer, args=(shared_queue,))

    producer_thread.start()
    consumer_thread.start()

    # 생산자 스레드가 완료될 때까지 대기
    producer_thread.join()

    # 소비자에게 종료 신호 전송
    shared_queue.put(None)

    # 모든 작업이 완료될 때까지 대기
    shared_queue.join() # 큐의 모든 task_done() 호출을 기다림
    consumer_thread.join() # 소비자 스레드 종료 대기

    print("생산자-소비자 프로세스 완료.")

멀티프로세싱

프로세스는 프로그램 실행의 기본 단위이며, 각 프로세스는 독립적인 메모리 공간과 시스템 자원을 가집니다. 멀티프로세싱은 프로그램을 여러 개의 프로세스로 분할하여 병렬로 실행함으로써 프로그램 성능과 효율성을 향상시킵니다. 멀티프로세싱은 다중 코어 CPU의 성능을 최대한 활용할 수 있습니다.

멀티프로세싱 사용 방법

Python에서는 multiprocessing 모듈을 사용하여 프로세스를 생성하고 관리합니다. 이 모듈은 Process 클래스, Pool 클래스, Queue 클래스 등을 제공합니다. Process 클래스는 단일 프로세스를 생성하는 데 사용되고, Pool 클래스는 프로세스 풀을 생성하는 데 사용되며, Queue 클래스는 프로세스 간 통신을 구현하는 데 사용됩니다.


from multiprocessing import Process
import requests
import os

def fetch_url(url):
    pid = os.getpid()
    print(f"프로세스 {pid} 시작: {url}")
    response = requests.get(url)
    # print(response.text) # 응답 내용 출력 (선택 사항)
    print(f"프로세스 {pid} 완료: {url}")
    return response

def main():
    processes = []
    urls = ['https://postman-echo.com/get?id=1', 'https://postman-echo.com/get?id=2']
    for url in urls:
        # 각 URL에 대해 별도의 프로세스 생성
        p = Process(target=fetch_url, args=(url,))
        processes.append(p)
        p.start() # 프로세스 시작

    # 모든 프로세스가 완료될 때까지 대기
    for p in processes:
        p.join()
    print("모든 프로세스 완료.")

if __name__ == '__main__':
    main()

프로세스와 스레드의 차이점

  • 자원 점유: 프로세스는 메모리, 파일, 네트워크 등 시스템 자원의 기본 할당 단위이며, 각 프로세스는 독립적인 주소 공간과 시스템 자원을 가집니다. 스레드는 프로세스 내의 실행 단위로, 프로세스의 주소 공간과 시스템 자원을 공유하지만, 각 스레드는 독립적인 스택 공간과 프로그램 카운터를 가집니다.
  • 전환 오버헤드: 프로세스 전환은 전체 프로세스 상태(메모리, 레지스터, 파일 등)를 저장하고 복원해야 하므로 스레드 전환보다 오버헤드가 큽니다. 스레드 전환은 스레드의 스택 공간과 프로그램 카운터만 저장하고 복원하면 되므로 상대적으로 가볍습니다.
  • 통신 방식: 프로세스 간 통신은 IPC(Inter-Process Communication) 메커니즘(파이프, 메시지 큐, 공유 메모리 등)을 사용해야 합니다. 스레드 간 통신은 공유 변수에 직접 접근하거나, 잠금, 세마포어, 조건 변수와 같은 스레드 동기화 원시를 사용하여 수행할 수 있습니다.
  • 안정성: 프로세스는 독립적인 주소 공간을 가지므로, 한 프로세스의 오류가 다른 프로세스에 영향을 미치지 않아 안정성이 높습니다. 스레드는 프로세스의 자원을 공유하므로, 한 스레드의 오류가 전체 프로세스에 영향을 미칠 수 있으며, 데이터 일관성과 안전성을 보장하기 위해 동기화 메커니즘이 필요합니다.
  • 생성 및 소멸: 프로세스 생성 및 소멸은 독립적인 주소 공간과 시스템 자원을 할당하고 해제해야 하므로 스레드보다 오버헤드가 큽니다. 스레드는 스택 공간과 프로그램 카운터만 할당/해제하면 되므로 상대적으로 가볍습니다.

멀티프로세싱 vs 멀티스레딩

  • CPU 집약적 작업: 계산이 많고 CPU 사용률이 높은 작업의 경우, 멀티프로세싱을 사용하는 것이 좋습니다. 멀티프로세싱은 다중 코어 CPU를 최대한 활용하고 GIL(Global Interpreter Lock)의 제약을 피하여 연산 효율을 높일 수 있습니다.
  • I/O 집약적 작업: 파일 입출력, 네트워크 통신 등 I/O 작업이 많은 경우, 멀티스레딩을 사용하는 것이 유리합니다. I/O 작업 중 발생하는 블로킹을 스레드 전환으로 회피하여 응답성과 처리량을 향상시킬 수 있습니다.
  • 데이터 공유 및 조정 작업: 데이터를 공유하고 여러 스레드(또는 프로세스) 간의 작업을 조정해야 하는 경우, 멀티스레딩 또는 멀티프로세싱을 고려할 수 있습니다. 멀티스레딩은 공유 변수에 직접 접근하므로 동기화 메커니즘이 필수적이며, 멀티프로세싱은 IPC 메커니즘을 사용해야 합니다.
  • 이식성: 다양한 운영체제에서 실행되어야 하는 경우, 멀티스레딩이 더 나은 선택일 수 있습니다. 스레드는 운영체제 수준의 스레드에 기반하므로 이식성이 높습니다. 반면, 멀티프로세싱의 IPC 메커니즘은 운영체제별로 구현 방식이 다를 수 있습니다.
  • 안정성 및 견고성: 높은 안정성과 견고성이 요구되는 애플리케이션의 경우, 멀티프로세싱이 더 적합합니다. 각 프로세스는 독립적인 메모리 공간을 가지므로, 한 프로세스의 크래시가 다른 프로세스에 영향을 미치지 않습니다.
  • 자원 사용량: 독립적인 메모리 공간을 사용하는 멀티프로세싱은 멀티스레딩보다 더 많은 메모리를 사용할 수 있습니다.
  • 유지보수성: 멀티스레딩은 코드와 데이터를 공유하므로 코드 중복이 줄어 유지보수가 용이할 수 있습니다. 멀티프로세싱은 IPC 구현으로 인해 코드가 복잡해지고 유지보수가 어려워질 수 있습니다.

프로세스 풀 (Process Pool)

프로세스 풀은 미리 정해진 개수의 워커 프로세스를 생성하여 작업을 할당하는 방식으로, 프로세스 생성 및 소멸 오버헤드를 줄여 성능을 향상시킵니다. multiprocessing 모듈의 Pool 클래스는 apply, apply_async, map, map_async와 같은 메서드를 제공하여 작업을 제출하고 관리합니다.

프로세스 풀 예제

멀티스레딩 또는 스레드 풀에 비해 멀티프로세싱 또는 프로세스 풀은 여전히 상대적으로 더 큰 오버헤드를 가집니다. 따라서 I/O 집약적인 작업(예: 네트워크 요청)에서는 성능상 큰 이점을 보이지 않을 수 있습니다.


from multiprocessing import Pool
import requests
import os

def fetch_url(url):
    pid = os.getpid()
    print(f"프로세스 {pid} 처리 중: {url}")
    response = requests.get(url)
    # print(f"응답: {response.text}")
    return f"PID {pid}: {url} processed."

def main():
    # 최대 5개의 워커 프로세스를 가진 프로세스 풀 생성
    with Pool(processes=5) as pool:
        urls = [f'https://postman-echo.com/get?task={i}' for i in range(10)]

        # map 메서드를 사용하여 모든 URL에 대해 fetch_url 함수를 병렬로 실행
        # map은 모든 작업이 완료될 때까지 기다리고 결과를 순서대로 반환
        results = pool.map(fetch_url, urls)

        print("\n모든 작업 결과:")
        for res in results:
            print(res)

if __name__ == '__main__':
    main()

멀티프로세싱 통신

Python에서는 다양한 방법으로 프로세스 간 통신(IPC)을 구현할 수 있습니다.

  • 파이프 (Pipe): 두 프로세스 간에 데이터를 주고받을 수 있는 양방향 통신 채널입니다.
  • 큐 (Queue): 여러 프로세스 간에 데이터를 안전하게 주고받을 수 있는 스레드/프로세스 안전 데이터 구조입니다.
  • 공유 메모리 (Value, Array): 여러 프로세스가 동일한 메모리 영역을 공유하여 데이터를 읽고 쓸 수 있습니다.
  • 데이터 관리자 (Manager): 여러 프로세스 간에 복잡한 데이터 구조(딕셔너리, 리스트 등)를 공유할 수 있도록 관리해주는 객체입니다.
  • 소켓 (Socket): 네트워크 통신을 사용하여 프로세스 간에 통신합니다. 로컬 호스트뿐만 아니라 분산 환경에서도 사용할 수 있습니다.

파이프 (Pipe)

파이프는 주로 두 프로세스 간의 일대일 통신에 사용됩니다.


from multiprocessing import Process, Pipe

def worker(child_conn):
    # 자식 프로세스: 부모로부터 데이터 수신
    data_from_parent = child_conn.recv()
    print(f"자식 프로세스 수신: {data_from_parent}")

    # 자식 프로세스: 부모에게 데이터 전송
    response = f"Hello from child process! Received: {data_from_parent}"
    child_conn.send(response)
    child_conn.close()

def main():
    # 양방향 파이프 생성
    parent_conn, child_conn = Pipe()

    # 워커 프로세스 생성 및 시작
    p = Process(target=worker, args=(child_conn,))
    p.start()

    # 부모 프로세스: 자식에게 데이터 전송
    message_to_child = "Message for child"
    parent_conn.send(message_to_child)
    print(f"부모 프로세스 전송: {message_to_child}")

    # 부모 프로세스: 자식으로부터 응답 수신
    response_from_child = parent_conn.recv()
    print(f"부모 프로세스 수신: {response_from_child}")

    p.join() # 자식 프로세스가 완료될 때까지 대기
    parent_conn.close()
    print("파이프 통신 완료.")

if __name__ == '__main__':
    main()

프로세스 큐 (Queue)

multiprocessing.Queue는 여러 프로세스 간에 데이터를 안전하게 전달하는 데 사용됩니다. queue.Queue는 주로 스레드 간 통신에 사용되는 반면, multiprocessing.Queue는 공유 메모리와 프로세스 잠금을 기반으로 하여 프로세스 간 통신에 최적화되어 있습니다.


from multiprocessing import Process, Queue

def worker(q):
    # 워커 프로세스: 큐에서 데이터 수신
    data_received = q.get()
    print(f"워커 프로세스 수신: {data_received}")

    # 워커 프로세스: 결과 큐에 넣기
    result = f"Processed: {data_received}"
    q.put(result)

def main():
    # 프로세스 간 통신을 위한 큐 생성
    processing_queue = Queue()

    # 워커 프로세스 생성 및 시작
    p = Process(target=worker, args=(processing_queue,))
    p.start()

    # 데이터를 큐에 넣기
    data_to_process = "Sample Data"
    processing_queue.put(data_to_process)
    print(f"메인 프로세스 전송: {data_to_process}")

    # 워커 프로세스로부터 결과 수신
    processed_result = processing_queue.get()
    print(f"메인 프로세스 수신: {processed_result}")

    p.join()
    print("큐 통신 완료.")

if __name__ == '__main__':
    main()

공유 메모리 (Value, Array)

multiprocessing 모듈의 ValueArray 객체를 사용하여 단일 변수 또는 변수 집합을 여러 프로세스 간에 공유할 수 있습니다.


from multiprocessing import Process, Value, Array
import ctypes # ctypes 라이브러리를 사용하여 C 데이터 타입 지정

def worker(idx, shared_value, shared_array):
    # shared_value는 'i' 타입 (signed integer)
    # shared_array는 'i' 타입 (signed integer) 배열, 크기는 5
    print(f"프로세스 {idx}: 시작")
    shared_value.value += 1 # 공유 정수 값 증가
    shared_array[idx] = idx * 10 # 공유 배열의 해당 인덱스에 값 저장
    print(f"프로세스 {idx}: 완료")

def main():
    # 단일 공유 정수 변수 생성 (초기값 0)
    # 'i'는 C 타입의 signed integer를 의미
    shared_val = Value('i', 0)

    # 공유 배열 생성 (크기 5, 초기값 0)
    # 'i'는 C 타입의 signed integer를 의미
    shared_arr = Array('i', 5)

    processes = []
    for i in range(5):
        p = Process(target=worker, args=(i, shared_val, shared_arr))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print(f"\n최종 공유 값: {shared_val.value}")
    # Array 객체는 리스트처럼 접근 가능
    print(f"최종 공유 배열: {list(shared_arr)}")

if __name__ == '__main__':
    main()

데이터 관리자 (Manager)

multiprocessing.Manager를 사용하면 프로세스 간에 리스트, 딕셔너리, 큐와 같은 복잡한 데이터 구조를 공유할 수 있습니다. Manager는 서버 프로세스를 시작하고 클라이언트 프로세스가 이 서버 프로세스와 통신하여 공유 객체를 관리합니다.


from multiprocessing import Process, Manager, current_process
import time

def worker(shared_dict):
    # 현재 프로세스의 이름을 가져옴
    process_name = current_process().name
    print(f"{process_name} 시작. 현재 count: {shared_dict.get('count', 'N/A')}")

    # 공유 딕셔너리 업데이트 (카운트 증가)
    shared_dict['count'] += 1
    print(f"{process_name} count 업데이트 완료. 현재 count: {shared_dict['count']}")
    time.sleep(0.1) # 동시성 문제 시뮬레이션

def main():
    # Manager 객체 생성
    manager = Manager()

    # Manager를 통해 공유 딕셔너리 생성
    # Manager 객체는 자체 서버 프로세스를 백그라운드에서 실행합니다.
    shared_data = manager.dict({"count": 0})
    print(f"메인 프로세스 시작. 초기 count: {shared_data['count']}")

    processes = []
    for i in range(5):
        p = Process(target=worker, args=(shared_data,), name=f"Worker-{i+1}")
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print(f"\n최종 count: {shared_data['count']}")

if __name__ == "__main__":
    main()

소켓 (Socket)

네트워크 소켓을 사용하여 여러 프로세스 간에 통신할 수 있습니다. 이는 TCP/UDP 프로토콜을 기반으로 하며, 로컬 머신뿐만 아니라 네트워크를 통해 다른 머신과도 통신할 수 있습니다.


import socket
from multiprocessing import Process

def server_process():
    # TCP 소켓 생성
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 주소와 포트 바인딩
    server_address = ('localhost', 6000)
    server_socket.bind(server_address)
    # 클라이언트 연결 대기
    server_socket.listen(1)
    print(f"서버: {server_address}에서 연결 대기 중...")

    # 클라이언트 연결 수락
    client_connection, client_address = server_socket.accept()
    print(f"서버: 클라이언트 {client_address} 연결됨.")

    # 클라이언트로부터 데이터 수신
    data = client_connection.recv(1024)
    print(f"서버: 수신: {data.decode()}")

    # 클라이언트에게 데이터 전송
    response_message = "World from Server"
    client_connection.sendall(response_message.encode())
    print(f"서버: 전송: {response_message}")

    # 연결 종료
    client_connection.close()
    server_socket.close()
    print("서버: 연결 종료.")

def client_process():
    # TCP 소켓 생성
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 서버에 연결
    server_address = ('localhost', 6000)
    client_socket.connect(server_address)
    print(f"클라이언트: 서버 {server_address}에 연결됨.")

    # 서버에게 데이터 전송
    message_to_server = "Hello from Client"
    client_socket.sendall(message_to_server.encode())
    print(f"클라이언트: 전송: {message_to_server}")

    # 서버로부터 데이터 수신
    data = client_socket.recv(1024)
    print(f"클라이언트: 수신: {data.decode()}")

    # 연결 종료
    client_socket.close()
    print("클라이언트: 연결 종료.")

if __name__ == '__main__':
    # 서버 프로세스 생성 및 시작
    server_proc = Process(target=server_process)
    server_proc.start()

    # 클라이언트 프로세스 생성 및 시작
    # 서버가 먼저 준비될 시간을 주기 위해 약간의 지연 추가
    import time
    time.sleep(1) # 서버가 바인딩되고 리슨할 시간을 줍니다.
    client_proc = Process(target=client_process)
    client_proc.start()

    # 프로세스 종료 대기
    server_proc.join()
    client_proc.join()
    print("소켓 통신 예제 완료.")

마스터-워커 패턴

마스터-워커 패턴은 병렬 컴퓨팅에서 널리 사용되는 패턴으로, 큰 작업을 여러 개의 작은 작업으로 분할하여 여러 워커에게 분배하고, 각 워커가 해당 작업을 병렬로 처리한 후, 마스터가 결과를 취합하여 최종 결과를 생성하는 방식입니다. Python에서는 multiprocessing 모듈을 사용하여 이 패턴을 구현할 수 있습니다.

마스터-워커 패턴 예제

마스터 노드는 작업을 작은 단위로 나누어 여러 워커 노드에 할당합니다. 워커 노드는 작업을 받아 처리하고 결과를 마스터 노드로 반환합니다. 마스터 노드는 모든 워커로부터 결과를 수집하여 최종 결과를 구성합니다.


from multiprocessing import Process, Queue, cpu_count
import time

def worker(task_queue, result_queue):
    """워커 프로세스: 작업을 받아 처리하고 결과를 반환"""
    while True:
        task = task_queue.get() # 작업 큐에서 작업 가져오기
        if task is None: # 종료 신호 (None)
            break
        # 실제 작업 수행 (예: 작업 값에 2 곱하기)
        result = task * 2
        print(f"{current_process().name}: 작업 {task} 처리 완료, 결과 {result}")
        result_queue.put(result) # 결과 큐에 결과 넣기
        time.sleep(0.1) # 작업 시간 시뮬레이션

def master():
    """마스터 프로세스: 작업을 생성하고 워커에게 분배, 결과 취합"""
    # 처리할 작업 목록
    tasks_to_do = [1, 2, 3, 4, 5, 6, 7, 8]
    task_queue = Queue()
    result_queue = Queue()

    # 작업 큐에 작업 넣기
    for task in tasks_to_do:
        task_queue.put(task)

    # 사용 가능한 CPU 코어 수만큼 워커 생성
    num_workers = cpu_count()
    print(f"마스터: {num_workers}개의 워커 프로세스 생성.")
    workers = []
    for _ in range(num_workers):
        p = Process(target=worker, args=(task_queue, result_queue))
        workers.append(p)
        p.start()

    # 모든 작업이 큐에 들어갔음을 알림 (각 워커에게 None을 보내 종료 신호)
    for _ in range(num_workers):
        task_queue.put(None)

    # 모든 워커 프로세스가 완료될 때까지 대기
    for w in workers:
        w.join()

    # 결과 큐에서 모든 결과 수집
    results = []
    while not result_queue.empty():
        results.append(result_queue.get())

    print('\n마스터: 모든 작업 결과 취합 완료.')
    print(f"결과: {sorted(results)}") # 결과를 정렬하여 출력

if __name__ == '__main__':
    master()

코루틴 (Coroutine)

코루틴은 '사용자 수준 스레드' 또는 '녹색 스레드'라고도 불리는 경량 스레드입니다. 코루틴은 실행 중에 상태를 저장하고 일시 중지했다가 필요할 때 다시 시작할 수 있는 특별한 종류의 함수입니다. 단일 스레드 내에서 동시성을 구현하여 프로그램 효율성과 응답 속도를 높일 수 있습니다.

코루틴 사용 방법

Python에서는 asyncio 모듈을 사용하여 코루틴을 구현합니다. asyncioasyncawait 키워드를 제공하여 코루틴 함수와 코루틴 객체를 정의합니다. 코루틴 함수는 실행 중에 다른 코루틴의 완료를 기다렸다가 재개될 수 있는 특별한 함수입니다. 코루틴 객체는 스케줄러에 의해 예약되고 실행될 수 있는 특별한 객체입니다.

코루틴 작성 단계:

  1. 비동기(코루틴) 함수 정의: async def fetch_data(url): ...
  2. 메인(비동기) 함수 정의: async def main(): ...
  3. await 키워드를 사용하여 다른 코루틴 함수의 완료를 기다리고 자동으로 실행을 전환합니다.
  4. asyncio.run()을 사용하여 메인 코루틴 함수를 실행합니다.

예제 - 결과 수집 안 함


import aiohttp # pip install aiohttp 필요
import asyncio

async def fetch_url(url):
    """주어진 URL에서 데이터를 비동기적으로 가져오는 코루틴"""
    print(f"요청 시작: {url}")
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            # 응답 본문 비동기적으로 읽기
            body = await response.text()
            print(f"응답 수신 완료: {url}")
            # print(body) # 응답 내용 출력 (선택 사항)
            return body

async def main():
    """메인 코루틴: 여러 URL에 대한 요청을 순차적으로 실행"""
    urls_to_fetch = [f'https://postman-echo.com/get?id={i}' for i in range(5)]
    for url in urls_to_fetch:
        await fetch_url(url) # 각 요청이 완료될 때까지 기다림
    print("모든 요청 완료.")

if __name__ == '__main__':
    # 비동기 메인 함수 실행
    asyncio.run(main())

예제 - 결과 수집


import aiohttp # pip install aiohttp 필요
import asyncio

async def fetch_url(url):
    """주어진 URL에서 데이터를 비동기적으로 가져오는 코루틴"""
    print(f"요청 시작: {url}")
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            body = await response.text()
            print(f"응답 수신 완료: {url}")
            return {"url": url, "status": response.status} # 결과로 URL과 상태 코드 반환

async def main():
    """메인 코루틴: 여러 URL에 대한 요청을 동시에 실행하고 결과 수집"""
    urls_to_fetch = [f'https://postman-echo.com/get?id={i}' for i in range(10)]
    # 여러 코루틴을 동시에 실행할 작업 리스트 생성
    tasks = [fetch_url(url) for url in urls_to_fetch]

    # asyncio.gather를 사용하여 모든 작업을 동시에 실행하고 결과 수집
    results = await asyncio.gather(*tasks)

    print("\n모든 작업 완료. 결과:")
    for result in results:
        print(result)

if __name__ == '__main__':
    asyncio.run(main())

코루틴 동시성 제어

코루틴은 스레드 전환 오버헤드가 없어 효율적이지만, 너무 많은 코루틴이 동시에 실행되면 시스템 자원 고갈로 성능이 저하될 수 있습니다. 따라서 코루틴의 동시 실행 수를 제어하는 것이 중요합니다. asyncio.Semaphore 또는 asyncio.Queue를 사용하여 동시성을 제한할 수 있습니다.


import aiohttp # pip install aiohttp 필요
import asyncio

async def fetch_url(url, semaphore):
    """세마포어를 사용하여 동시성 제어하며 URL 가져오기"""
    async with semaphore: # 세마포어 획득 (동시 요청 수 제한)
        print(f"세마포어 획득, 요청 시작: {url}")
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                body = await response.text()
                print(f"응답 수신 완료: {url}")
                return {"url": url, "status": response.status}

async def main():
    """메인 코루틴: 세마포어를 사용하여 동시성 제어하며 요청 실행"""
    # 최대 5개의 동시 요청을 허용하는 세마포어 생성
    semaphore = asyncio.Semaphore(5)
    urls_to_fetch = [f'https://postman-echo.com/get?id={i}' for i in range(10)]

    tasks = [fetch_url(url, semaphore) for url in urls_to_fetch]
    results = await asyncio.gather(*tasks)

    print("\n모든 작업 완료. 결과:")
    for result in results:
        print(result)

if __name__ == '__main__':
    asyncio.run(main())

Python 네트워킹 프로그래밍

Python은 네트워킹 프로그래밍을 위한 풍부한 라이브러리와 모듈을 제공합니다.

주요 Python 네트워킹 라이브러리 및 모듈:

  • socket: 저수준 네트워크 인터페이스(소켓)를 제공합니다.
  • ssl: 소켓에 TLS/SSL 암호화를 제공합니다.
  • socketserver: 소켓 기반 TCP 및 UDP 서버를 쉽게 구축할 수 있도록 합니다.
  • urllib: URL 처리 및 HTTP 요청 전송 기능을 제공합니다.
  • http: 간단한 HTTP 서버 및 클라이언트 구현을 제공합니다.
  • ftplib: FTP 프로토콜 클라이언트 기능을 제공합니다.
  • poplib: POP3 프로토콜 클라이언트 기능을 제공합니다.
  • imaplib: IMAP 프로토콜 클라이언트 기능을 제공합니다.
  • smtplib: SMTP 프로토콜 클라이언트 기능을 제공합니다.
  • xmlrpc: XML-RPC 서버 및 클라이언트 구현을 제공합니다.

Python TCP 및 UDP 프로토콜

Python에서는 socket 모듈을 사용하여 TCP 및 UDP 프로토콜을 다룰 수 있습니다.


# 소켓 생성: socket.socket(, )

# 주소 체계 (address_family):
# - socket.AF_UNIX: 유닉스 도메인 소켓
# - socket.AF_INET: IPv4
# - socket.AF_INET6: IPv6

# 소켓 타입 (socket_type):
# - socket.SOCK_STREAM: TCP 연결 (연결 지향, 스트림 기반)
# - socket.SOCK_DGRAM: UDP 데이터그램 (비연결 지향, 데이터그램 기반)

# 예: IPv4 TCP 소켓 생성
tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 예: IPv4 UDP 소켓 생성
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

TCP 서버 및 클라이언트

socket 모듈 사용

TCP 서버 및 클라이언트 소켓의 일반적인 메서드:

  • bind(address): 서버 소켓을 특정 주소와 포트에 바인딩합니다.
  • listen(backlog): TCP 서버에서 클라이언트 연결을 수신 대기합니다. backlog은 대기 큐의 최대 크기입니다.
  • accept(): TCP 서버에서 클라이언트 연결을 수락합니다. 연결된 클라이언트 소켓 객체와 클라이언트 주소를 반환합니다.
  • connect(address): TCP 클라이언트가 서버에 연결합니다.
  • send(bytes): 데이터를 전송합니다.
  • recv(buffer_size): 데이터를 수신합니다. buffer_size는 한 번에 받을 최대 바이트 수입니다.
  • close(): 소켓 연결을 닫습니다.

# TCP 서버 예제
import socket

IPv4, TCP = socket.AF_INET, socket.SOCK_STREAM
SERVER_ADDRESS = ('127.0.0.1', 9999)  # 서버 주소 및 포트

# 'with' 문을 사용하여 소켓이 자동으로 닫히도록 함
with socket.socket(IPv4, TCP) as server_socket:
    server_socket.bind(SERVER_ADDRESS)  # 서버 주소에 바인딩
    server_socket.listen(5)  # 최대 5개의 동시 연결 대기
    print(f"서버 시작: {SERVER_ADDRESS}. 연결 대기 중...")

    while True:
        try:
            # 클라이언트 연결 수락
            client_socket, addr = server_socket.accept()
            print(f'클라이언트 연결됨: {addr}')

            # 클라이언트에게 환영 메시지 전송
            message = f'Welcome client from {addr}'
            client_socket.sendall(message.encode('utf-8'))

            # 클라이언트와의 통신 후 소켓 닫기 (여기서는 간단히 메시지 하나만 보내고 닫음)
            client_socket.close()
            print(f"클라이언트 {addr} 연결 종료.")

        except KeyboardInterrupt:
            print("서버 종료 중...")
            break
        except Exception as e:
            print(f"오류 발생: {e}")
            break
print("서버가 종료되었습니다.")


# TCP 클라이언트 예제
import socket

IPv4, TCP = socket.AF_INET, socket.SOCK_STREAM
SERVER_ADDRESS = ('127.0.0.1', 9999)  # 접속할 서버 주소 및 포트

with socket.socket(IPv4, TCP) as client_socket:
    try:
        client_socket.connect(SERVER_ADDRESS)  # 서버에 연결 시도
        print(f"서버 {SERVER_ADDRESS}에 연결되었습니다.")

        # 서버로부터 메시지 수신
        message = client_socket.recv(1024) # 최대 1024 바이트 수신
        print(f"서버로부터 받은 메시지: {message.decode('utf-8')}")

    except ConnectionRefusedError:
        print(f"오류: 서버 {SERVER_ADDRESS}에 연결할 수 없습니다. 서버가 실행 중인지 확인하세요.")
    except Exception as e:
        print(f"오류 발생: {e}")
    # 'with' 블록을 벗어나면 자동으로 client_socket.close() 호출

socketserver 모듈 사용

socketserver 모듈의 TCPServer를 사용하여 TCP 서버를 더 쉽게 구현할 수 있습니다. 요청 처리는 BaseRequestHandler를 상속받은 핸들러 클래스의 handle() 메서드에서 구현합니다.

핸들러 클래스의 주요 메서드:

  • self.request.recv(buffer_size): 클라이언트로부터 데이터 수신
  • self.client_address: 클라이언트의 주소 (IP, 포트) 튜플
  • self.request.sendall(bytes): 클라이언트에게 데이터 전송

# TCP 서버 예제 2 (socketserver 사용)
from socketserver import TCPServer, BaseRequestHandler
import time

SERVER_ADDRESS = ('127.0.0.1', 9991)  # 서버 주소 및 포트

class MyTCPHandler(BaseRequestHandler):
    """
    각 클라이언트 연결을 처리하는 요청 핸들러
    """
    def handle(self):
        print(f'클라이언트 연결됨: {self.client_address}')
        # 클라이언트에게 메시지 전송
        message = "Hello from socketserver!"
        self.request.sendall(message.encode('utf-8'))
        print(f"클라이언트 {self.client_address}에게 메시지 전송 완료.")
        # handle 메서드가 완료되면 자동으로 연결이 닫힙니다.

# TCPServer 인스턴스 생성 및 서버 실행
# TCPServer는 요청을 처리할 핸들러 클래스를 인수로 받습니다.
with TCPServer(SERVER_ADDRESS, MyTCPHandler) as server:
    print(f"socketserver 기반 TCP 서버 시작: {SERVER_ADDRESS}")
    # serve_forever()는 서버를 계속 실행하여 들어오는 요청을 처리합니다.
    try:
        server.serve_forever()
    except KeyboardInterrupt:
        print("서버 종료 중...")
    except Exception as e:
        print(f"오류 발생: {e}")
print("서버가 종료되었습니다.")

# TCP 클라이언트 예제 (이전 클라이언트 코드와 동일하게 작동)
import socket

IPv4, TCP = socket.AF_INET, socket.SOCK_STREAM
SERVER_ADDRESS = ('127.0.0.1', 9991)  # 접속할 서버 주소 및 포트

with socket.socket(IPv4, TCP) as client_socket:
    try:
        client_socket.connect(SERVER_ADDRESS)
        print(f"서버 {SERVER_ADDRESS}에 연결되었습니다.")
        message = client_socket.recv(1024)
        print(f"서버로부터 받은 메시지: {message.decode('utf-8')}")
    except ConnectionRefusedError:
        print(f"오류: 서버 {SERVER_ADDRESS}에 연결할 수 없습니다.")
    except Exception as e:
        print(f"오류 발생: {e}")

UDP 서버 및 클라이언트

socket 모듈 사용

UDP 서버 및 클라이언트 소켓의 일반적인 메서드:

  • bind(address): 서버 소켓을 특정 주소와 포트에 바인딩합니다.
  • recvfrom(buffer_size): 데이터를 수신합니다. 수신된 데이터와 발신자 주소를 튜플로 반환합니다.
  • sendto(bytes, address): 지정된 주소로 데이터를 전송합니다.
  • close(): 소켓 연결을 닫습니다.

# UDP 서버 예제
import socket

IPv4, UDP = socket.AF_INET, socket.SOCK_DGRAM
SERVER_ADDRESS = ("127.0.0.1", 9998)

with socket.socket(IPv4, UDP) as server_socket:
    server_socket.bind(SERVER_ADDRESS) # 서비스 주소에 바인딩
    print(f"UDP 서버 시작: {SERVER_ADDRESS}. 데이터 수신 대기 중...")

    while True:
        try:
            # 데이터와 클라이언트 주소 수신
            data, client_addr = server_socket.recvfrom(1024)
            print(f"클라이언트 {client_addr}로부터 수신: {data.decode('utf-8')}")

            # 클라이언트에게 응답 전송
            response_msg = "Hello from UDP server!"
            server_socket.sendto(response_msg.encode('utf-8'), client_addr)
            print(f"클라이언트 {client_addr}에게 응답 전송.")

        except KeyboardInterrupt:
            print("서버 종료 중...")
            break
        except Exception as e:
            print(f"오류 발생: {e}")
            break
print("UDP 서버가 종료되었습니다.")

# UDP 클라이언트 예제
import socket

IPv4, UDP = socket.AF_INET, socket.SOCK_DGRAM
SERVER_ADDRESS = ("127.0.0.1", 9998) # 접속할 서버 주소 및 포트

with socket.socket(IPv4, UDP) as client_socket:
    message = "Ping from UDP client!"
    try:
        # 서버로 메시지 전송
        client_socket.sendto(message.encode('utf-8'), SERVER_ADDRESS)
        print(f"서버 {SERVER_ADDRESS}로 메시지 전송: {message}")

        # 서버로부터 응답 수신
        # UDP는 비연결형이므로, sendto 후 recvfrom으로 응답을 기다립니다.
        data, server_addr = client_socket.recvfrom(1024)
        print(f"서버 {server_addr}로부터 수신: {data.decode('utf-8')}")

    except Exception as e:
        print(f"오류 발생: {e}")

socketserver 모듈 사용

UDPServer를 사용하여 UDP 서버를 구현할 수도 있습니다. 마찬가지로 BaseRequestHandler를 상속받은 핸들러 클래스를 사용합니다.

핸들러 클래스의 주요 속성 및 메서드:

  • self.request: 튜플 형태로 (수신된 데이터, 클라이언트 소켓)을 포함합니다.
  • self.client_address: 클라이언트의 주소 (IP, 포트) 튜플
  • self.sendto(bytes, address): 클라이언트에게 데이터 전송

# UDP 서버 예제 2 (socketserver 사용)
from socketserver import UDPServer, BaseRequestHandler

SERVER_ADDRESS = ("127.0.0.1", 9998)

class MyUDPHandler(BaseRequestHandler):
    """
    UDP 요청을 처리하는 핸들러
    """
    def handle(self):
        # self.request는 (data, socket_connection) 튜플입니다.
        data, client_socket = self.request
        client_address = self.client_address # 클라이언트 주소

        print(f"클라이언트 {client_address}로부터 수신: {data.decode('utf-8')}")

        # 클라이언트에게 응답 전송
        response_msg = "Hello from socketserver UDP!"
        client_socket.sendto(response_msg.encode('utf-8'), client_address)
        print(f"클라이언트 {client_address}에게 응답 전송.")

# UDPServer 인스턴스 생성 및 서버 실행
with UDPServer(SERVER_ADDRESS, MyUDPHandler) as server:
    print(f"socketserver 기반 UDP 서버 시작: {SERVER_ADDRESS}")
    try:
        server.serve_forever()
    except KeyboardInterrupt:
        print("서버 종료 중...")
    except Exception as e:
        print(f"오류 발생: {e}")
print("UDP 서버가 종료되었습니다.")

# UDP 클라이언트 예제 (이전 클라이언트 코드와 동일하게 작동)
import socket

IPv4, UDP = socket.AF_INET, socket.SOCK_DGRAM
SERVER_ADDRESS = ("127.0.0.1", 9998)

with socket.socket(IPv4, UDP) as client_socket:
    message = "Ping from socketserver UDP client!"
    try:
        client_socket.sendto(message.encode('utf-8'), SERVER_ADDRESS)
        print(f"서버 {SERVER_ADDRESS}로 메시지 전송: {message}")
        data, server_addr = client_socket.recvfrom(1024)
        print(f"서버 {server_addr}로부터 수신: {data.decode('utf-8')}")
    except Exception as e:
        print(f"오류 발생: {e}")

HTTP 프로토콜

http 라이브러리

Python 내장 http 모듈은 http.server, http.client, http.cookies, http.cookiejar, HTTPStatus 등을 포함합니다.

  • http.server: TCPServer 기반의 간단한 HTTP 서버 (정적 파일 서버)를 제공합니다.
  • http.client: 저수준 HTTP 프로토콜 클라이언트 기능을 제공합니다. 고수준 URL 접근에는 urllib.request를 사용하는 것이 좋습니다.

# 간단한 HTTP 서버 예제 (정적 파일 제공)
from http.server import SimpleHTTPRequestHandler
from socketserver import TCPServer

SERVER_ADDRESS = ("127.0.0.1", 8000)

# SimpleHTTPRequestHandler는 현재 디렉토리의 파일을 제공합니다.
with TCPServer(SERVER_ADDRESS, SimpleHTTPRequestHandler) as httpd:
    print(f"HTTP 서버 시작: http://127.0.0.1:8000")
    try:
        httpd.serve_forever()
    except KeyboardInterrupt:
        print("서버 종료 중...")
print("HTTP 서버가 종료되었습니다.")

# 웹 브라우저에서 http://127.0.0.1:8000 에 접속하면
# 위에서 실행된 디렉토리의 파일 목록이 표시됩니다.

# http.client 사용 예제
from http.client import HTTPConnection

# HTTPS 연결 예제 (SSL/TLS 사용)
# conn = HTTPSConnection("postman-echo.com") # HTTPS Connection

# HTTP 연결 예제
conn = HTTPConnection("postman-echo.com")

try:
    conn.request("GET", "/get") # GET 요청 전송
    response = conn.getresponse() # 응답 받기

    print(f'응답 상태 코드: {response.status}')
    print(f'응답 상태 메시지: {response.reason}')

    # 응답 데이터 읽기 (바이트 형태)
    response_data = response.read()
    print('응답 데이터:', response_data.decode('utf-8'))

except Exception as e:
    print(f"요청 중 오류 발생: {e}")
finally:
    conn.close() # 연결 종료

urllib 라이브러리

Python의 urllib 라이브러리는 urllib.request (HTTP 요청 전송), urllib.parse (URL 파싱) 등 관련 모듈을 제공합니다.

  • urllib.request.urlopen(url): 지정된 URL을 열고(요청) 응답 데이터를 반환합니다.
  • urllib.parse.urlparse(url): URL을 파싱하여 각 구성 요소(scheme, netloc, path 등)를 분리합니다.
  • urllib.parse.quote(string): 문자열 데이터를 URL 인코딩합니다.
  • urllib.parse.unquote(string): URL 인코딩된 문자열을 디코딩합니다.
  • urllib.parse.urlencode(query_dict): 딕셔너리 데이터를 URL 쿼리 문자열 형식으로 인코딩합니다.

from urllib import request, parse

# URL 쿼리 파라미터 인코딩
query_params = {'name': 'Alice', 'age': 30}
encoded_query = parse.urlencode(query_params) # 'name=Alice&age=30'

# URL 생성
base_url = "https://postman-echo.com/get"
full_url = f"{base_url}?{encoded_query}"
print(f"요청 URL: {full_url}")

try:
    # URL 열기 (HTTP GET 요청)
    with request.urlopen(full_url) as response:
        # 응답 데이터 읽기 및 디코딩
        response_body = response.read().decode('utf-8')
        print("\n응답 데이터:")
        print(response_body)
except Exception as e:
    print(f"URL 요청 중 오류 발생: {e}")

from urllib.parse import quote, unquote, urlencode

# URL 디코딩 예제
encoded_url = 'https://example.com/search?query=%ED%8C%8C%EC%9D%B4%EC%8D%AC' # '파이썬'이 URL 인코딩된 형태
decoded_url = unquote(encoded_url)
print(f"URL 디코딩 결과: {decoded_url}")

# URL 인코딩 예제
# 일반 문자열 인코딩
text_to_encode = "special chars: &?"
quoted_text = quote(text_to_encode)
print(f"문자열 인코딩 결과: {quoted_text}")

# 딕셔너리 데이터를 쿼리 문자열로 인코딩
params_dict = {'keyword': 'Python', 'lang': 'ko'}
encoded_params = urlencode(params_dict)
print(f"쿼리 파라미터 인코딩 결과: {encoded_params}")

# URL에 쿼리 파라미터 결합
base = "https://example.com/api"
final_url = f"{base}?{encoded_params}"
print(f"최종 URL: {final_url}")

Flask/FastAPI를 사용한 HTTP 서비스 구축

Flask는 Python의 경량 웹 프레임워크로, API 개발이 매우 간편합니다. FastAPI는 고성능 비동기 웹 프레임워크로, Flask와 유사한 개발 스타일을 가지며 자동 API 문서 생성 기능도 제공합니다.


# 파일명: flask_app.py
# 설치 필요: pip install Flask
from flask import Flask, jsonify

app = Flask(__name__)

@app.route('/', methods=['GET'])
def home():
    """루트 경로에 대한 GET 요청 처리"""
    return jsonify({'message': 'Hello from Flask!'})

@app.route('/user/<username>')
def show_user_profile(username):
    """사용자 프로필 경로 처리"""
    return jsonify({'user': username})

# 서비스 실행 방법:
# 터미널에서: flask --app flask_app.py --port 5000 run
# 또는 python flask_app.py 실행 후 http://127.0.0.1:5000/ 에 접속
if __name__ == '__main__':
    app.run(port=5000)

# 파일명: fastapi_app.py
# 설치 필요: pip install fastapi uvicorn[standard]
from fastapi import FastAPI
import uvicorn

app = FastAPI()

@app.get("/")
async def read_root():
    """루트 경로에 대한 GET 요청 처리"""
    return {"message": "Hello from FastAPI!"}

@app.get("/items/{item_id}")
async def read_item(item_id: int, q: str | None = None):
    """아이템 ID와 선택적 쿼리 문자열을 받는 GET 요청 처리"""
    return {"item_id": item_id, "q": q}

# 서비스 실행 방법:
# 터미널에서: uvicorn fastapi_app:app --reload --port 5001
# 또는 python fastapi_app.py 실행 후 http://127.0.0.1:5001/ 에 접속
# http://127.0.0.1:5001/docs 에서 API 문서 확인 가능
if __name__ == '__main__':
    uvicorn.run(app, host="127.0.0.1", port=5001)

Django를 사용한 HTTP 서비스 구축

Django는 Python의 풀스택 웹 프레임워크입니다. pip install django로 설치 후, 프로젝트 및 앱을 생성합니다.

# Django 프로젝트 및 앱 생성
$ django-admin startproject myproject
$ cd myproject
$ python manage.py startapp myapp

myproject/settings.py 파일의 INSTALLED_APPS'myapp'을 등록해야 합니다. myapp/views.py에 API 로직을 작성하고, myapp/urls.py에 URL 패턴을 연결한 후, myproject/urls.py에서 해당 URL 설정을 포함시켜야 합니다.


# myapp/views.py
from django.http import JsonResponse

def home_view(request):
    """간단한 JSON 응답을 반환하는 뷰"""
    return JsonResponse({'message': 'Hello from Django!'})

def user_view(request, user_id):
    """사용자 ID를 받아 JSON 응답 반환"""
    return JsonResponse({'user_id': user_id, 'message': 'User profile'})

# myapp/urls.py
from django.urls import path
from . import views

urlpatterns = [
    path('hello/', views.home_view, name='home'),
    path('user//', views.user_view, name='user_profile'),
]

# myproject/urls.py
from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    path('admin/', admin.site.urls),
    path('api/', include('myapp.urls')), # myapp의 URL 설정 포함
]

서비스 실행: python manage.py runserver 5002

접속: http://127.0.0.1:5002/api/hello/

requests 라이브러리로 HTTP 요청 보내기

requests는 Python에서 HTTP 요청을 보내는 데 널리 사용되는 매우 편리한 서드파티 라이브러리입니다. 비동기 및 HTTP/2는 지원하지 않습니다.

설치: pip install requests


import requests

# GET 요청 - 쿼리 파라미터 및 사용자 정의 헤더 포함
response_get = requests.get(
    "https://postman-echo.com/get",
    params={'name': 'Lin Yuan', 'age': 21}, # 쿼리 파라미터
    headers={'X-Custom-Header': 'MyValue'} # 사용자 정의 헤더
)
print("GET 요청 응답:")
print(response_get.text)
print("-" * 20)

# POST 폼 데이터 요청
response_post_form = requests.post(
    "https://postman-echo.com/post",
    data={'username': 'testuser', 'password': 'password123'} # 폼 데이터
)
print("POST (form) 요청 응답:")
print(response_post_form.text)
print("-" * 20)

# POST JSON 요청
response_post_json = requests.post(
    "https://postman-echo.com/post",
    json={'name': 'Lin Yuan', 'data': {'key': 'value'}} # JSON 데이터
)
print("POST (json) 요청 응답:")
print(response_post_json.text)
print("-" * 20)

# HTTP/2 요청 (requests는 기본적으로 HTTP/1.1 사용)
# HTTP/2 테스트를 위해서는 별도 라이브러리 필요
# response_http2 = requests.get("https://http2.pro/api/v1")
# print("HTTP/2 요청 응답 (requests):")
# print(response_http2.text)

hyper 라이브러리로 HTTP/2 요청 보내기

hyper는 다양한 프로토콜을 지원하는 HTTP 요청 라이브러리로, HTTP/2를 지원합니다.

설치: pip install hyper


import hyper

try:
    # HTTP/2 연결 생성
    # hyper는 URL에 스킴(http:// 또는 https://)이 필요합니다.
    conn = hyper.HTTP20Connection("http2.pro", port=443, secure=True) # HTTPS 기본 포트 443

    # GET 요청 전송
    conn.request("GET", "/api/v1")

    # 응답 받기
    resp = conn.get_response()

    # 응답 상태 코드 및 헤더 출력
    print(f"상태: {resp.status}")
    print(f"헤더: {resp.headers}")

    # 응답 본문 읽기
    response_body = resp.read()
    print("응답 본문:", response_body.decode('utf-8'))

except Exception as e:
    print(f"HTTP/2 요청 중 오류 발생: {e}")
finally:
    if 'conn' in locals():
        conn.close() # 연결 종료

aiohttp 라이브러리로 비동기 HTTP 요청 보내기

aiohttp는 고성능 비동기 HTTP 요청 라이브러리로, 코루틴과 함께 사용됩니다.

설치: pip install aiohttp


import aiohttp
import asyncio

async def fetch_data():
    """aiohttp를 사용하여 비동기 HTTP GET 요청 보내기"""
    async with aiohttp.ClientSession() as session:
        url = 'http://postman-echo.com/get'
        try:
            async with session.get(url) as response:
                print(f"상태 코드: {response.status}")
                print(f"콘텐츠 타입: {response.headers.get('content-type')}")
                # 응답 본문 비동기적으로 읽기
                body = await response.text()
                print("응답 데이터:")
                print(body[:200] + "..." if len(body) > 200 else body) # 내용 일부만 출력
        except aiohttp.ClientError as e:
            print(f"aiohttp 요청 중 오류 발생: {e}")

async def main():
    await fetch_data()

if __name__ == '__main__':
    asyncio.run(main())

PycURL 라이브러리로 HTTP 요청 보내기

PycURL은 libcurl 라이브러리의 Python 래퍼로, HTTP, HTTPS, FTP, SMTP 등 다양한 프로토콜을 지원하며, requests와 유사한 API를 제공합니다.

설치: pip install pycurl

macOS 설치 시 주의사항:


export PYCURL_SSL_LIBRARY=openssl
export LDFLAGS=-L/usr/local/opt/openssl/lib
export CPPFLAGS=-I/usr/local/opt/openssl/include
pip install pycurl --compile --no-cache-dir

import pycurl
import io

# 요청을 보낼 URL
url_to_fetch = 'https://postman-echo.com/get'

# 응답 내용을 저장할 버퍼 생성
buffer = io.BytesIO()

# Curl 객체 생성
c = pycurl.Curl()
try:
    # 요청 설정
    c.setopt(pycurl.URL, url_to_fetch) # 요청 URL 설정
    c.setopt(pycurl.WRITEDATA, buffer) # 응답 내용을 buffer에 쓰도록 설정
    c.setopt(pycurl.FOLLOWLOCATION, True) # 리다이렉션 따라가기

    # 요청 실행
    c.perform()

    # HTTP 상태 코드 및 기타 정보 가져오기
    http_code = c.getinfo(pycurl.HTTP_CODE)
    print(f"HTTP 상태 코드: {http_code}")
    print(f"총 요청 시간: {c.getinfo(pycurl.TOTAL_TIME):.4f}초")
    print(f"네임 서버 조회 시간: {c.getinfo(pycurl.NAMELOOKUP_TIME):.4f}초")
    print(f"서버 연결 시간: {c.getinfo(pycurl.CONNECT_TIME):.4f}초")
    print(f"첫 바이트 수신 시간: {c.getinfo(pycurl.STARTTRANSFER_TIME):.4f}초")

    # 버퍼에서 응답 내용 읽기 및 디코딩
    response_body = buffer.getvalue().decode('utf-8')
    print("\n응답 내용 (일부):")
    print(response_body[:300] + "..." if len(response_body) > 300 else response_body)

except pycurl.error as e:
    print(f"PycURL 오류 발생: {e}")
finally:
    # Curl 객체 정리
    c.close()

requestz 라이브러리로 HTTP 요청 보내기

requestzPycURL을 기반으로 requests와 유사한 사용성을 제공하며, HTTP/2 지원 및 응답 통계 기능을 포함하는 라이브러리입니다.

설치: pip install requestz


import requestz

try:
    # HTTP/2 테스트 URL로 GET 요청 보내기
    response = requestz.get("https://http2.pro/api/v1")

    print("응답 텍스트:")
    print(response.text)

    print("\n응답 통계:")
    print(response.stats) # 응답 관련 통계 정보 출력

except Exception as e:
    print(f"requestz 요청 중 오류 발생: {e}")

WebSocket 프로토콜

WebSocket은 웹 브라우저와 서버 간의 실시간 양방향 통신을 위한 프로토콜입니다. 서버가 클라이언트에게 능동적으로 메시지를 푸시할 수 있게 해주며, HTTP를 통해 핸드셰이크를 수행한 후 독립적인 TCP 연결을 통해 통신합니다. WebSocket URL 스킴은 ws:// (암호화되지 않음) 또는 wss:// (암호화됨)입니다.

WebSocket 프로토콜의 장점:

  • 실시간성: 서버의 새로운 메시지를 즉시 클라이언트로 푸시하여 실시간 통신이 가능합니다.
  • 양방향 통신: 클라이언트와 서버 간에 자유로운 데이터 송수신이 가능합니다.
  • 효율성: TCP 기반으로 신뢰성 있고 효율적인 데이터 전송을 보장합니다.

Python에서는 websockets 라이브러리를 사용하여 WebSocket 서버를 구축하거나 클라이언트 요청을 보낼 수 있습니다.

설치: pip install websockets


# WebSocket 서버 예제
import asyncio
import websockets

async def handle_connection(websocket, path):
    """
    클라이언트 연결을 처리하는 비동기 함수
    :param websocket: 클라이언트 소켓 객체
    :param path: 클라이언트가 요청한 경로 (여기서는 사용되지 않음)
    """
    print(f"클라이언트 연결됨: {websocket.remote_address}")
    try:
        # 클라이언트로부터 메시지 수신 (첫 번째 메시지)
        client_message = await websocket.recv()
        print(f"< 클라이언트: {client_message}")

        # 클라이언트에게 인사 메시지 전송
        greeting = f"Hello, {client_message}!"
        await websocket.send(greeting)
        print(f"> 서버: {greeting}")

        # 계속해서 메시지를 주고받을 수 있습니다.
        # 예: 다른 메시지 수신 및 응답
        # another_message = await websocket.recv()
        # await websocket.send(f"Received your second message: {another_message}")

    except websockets.exceptions.ConnectionClosedOK:
        print(f"클라이언트 {websocket.remote_address} 연결이 정상적으로 종료되었습니다.")
    except websockets.exceptions.ConnectionClosedError as e:
        print(f"클라이언트 {websocket.remote_address} 연결 오류: {e}")
    finally:
        print(f"클라이언트 {websocket.remote_address} 연결 종료.")

async def run_server():
    """WebSocket 서버를 시작하고 실행"""
    server_address = "localhost"
    server_port = 8900
    # websockets.serve 함수를 사용하여 서버 시작
    # handle_connection 함수가 각 새 연결에 대해 호출됩니다.
    async with websockets.serve(handle_connection, server_address, server_port):
        print(f"WebSocket 서버 시작됨: ws://{server_address}:{server_port}")
        await asyncio.Future()  # 서버를 계속 실행 상태로 유지

if __name__ == "__main__":
    try:
        asyncio.run(run_server())
    except KeyboardInterrupt:
        print("서버 종료 중...")

# WebSocket 클라이언트 예제
import asyncio
import websockets

async def send_message():
    """WebSocket 서버에 연결하고 메시지 송수신"""
    server_uri = "ws://localhost:8900" # 서버 주소
    try:
        # 서버에 비동기적으로 연결
        async with websockets.connect(server_uri) as websocket:
            print(f"서버 {server_uri}에 연결됨.")

            # 사용자로부터 이름 입력 받기
            name = input("당신의 이름은 무엇인가요? ")

            # 서버에 이름 전송
            await websocket.send(name)
            print(f"> 서버로 전송: {name}")

            # 서버로부터 인사 메시지 수신
            greeting = await websocket.recv()
            print(f"< 서버로부터 수신: {greeting}")

            # 연결이 유지되는 동안 계속 통신할 수 있습니다.
            # 예: 추가 메시지 전송 및 수신
            # await websocket.send("How are you?")
            # response = await websocket.recv()
            # print(f"< 서버: {response}")

    except ConnectionRefusedError:
        print(f"오류: 서버 {server_uri}에 연결할 수 없습니다. 서버가 실행 중인지 확인하세요.")
    except Exception as e:
        print(f"클라이언트 오류 발생: {e}")
    finally:
        print("클라이언트 연결 종료.")

if __name__ == "__main__":
    asyncio.run(send_message())

WebDriver 프로토콜 - HTTP를 통해 브라우저 제어

WebDriver 프로토콜은 웹 브라우저 자동화를 위한 표준 프로토콜입니다. 브라우저 동작을 제어하는 API 세트를 정의하며, 다양한 프로그래밍 언어(Python, Java, C# 등) 및 테스트 프레임워크(Selenium 등)와 통합될 수 있습니다.

WebDriver 프로토콜의 주요 기능:

  • 브라우저 제어: 페이지 열기, 링크 클릭, 폼 채우기 등
  • 요소 찾기 및 조작: DOM 요소 검색, 텍스트 입력, 클릭 등
  • JavaScript 실행
  • 스크린샷 캡처

CDP (Chrome DevTools Protocol) - WebSocket 기반 (Pyppeteer / Playwright)

Chrome DevTools Protocol (CDP)은 Chrome 브라우저와 통신하기 위한 프로토콜입니다. 개발자가 HTTP를 통해 브라우저 동작을 제어하고 디버깅할 수 있게 합니다. Pyppeteer, Playwright와 같은 라이브러리는 CDP를 사용하여 브라우저를 제어합니다.

CDP 프로토콜의 주요 기능:

  • 브라우저 상태 정보 얻기
  • 브라우저 동작 제어
  • JavaScript 코드 디버깅 (중단점 설정, 단계별 실행 등)
  • 페이지 정보 획득 (DOM, CSS, 네트워크 요청 등)
  • 스크린샷 캡처

Python으로 이메일 보내기

Python에서는 smtplib 모듈을 사용하여 SMTP 프로토콜(이메일 발송)을, imaplib 모듈을 사용하여 IMAP 프로토콜(이메일 수신), poplib 모듈을 사용하여 POP3 프로토콜(이메일 수신)을 구현할 수 있습니다. 다음은 smtplibemail 모듈을 사용하여 이메일을 보내는 예제입니다.


import smtplib
from email.mime.text import MIMEText
from email.header import Header # 헤더를 MIME 형식으로 처리하기 위해 필요

# --- 설정 ---
sender_email = "your_email@example.com" # 발신자 이메일 주소
sender_password = "your_app_password"   # 앱 비밀번호 또는 메일 앱 비밀번호
receiver_email = "recipient@example.com" # 수신자 이메일 주소
smtp_server = "smtp.example.com"      # SMTP 서버 주소 (예: smtp.qq.com, smtp.gmail.com)
smtp_port = 465                       # SMTP 서버 포트 (SSL: 465, TLS: 587)
email_subject = "Test Email from Python"
email_body = "This is a test email sent from a Python script using smtplib."

# 1. 이메일 내용 생성 (MIME 형식)
# MIMEText(text, _subtype='plain', _charset='utf-8')
msg = MIMEText(email_body, 'plain', 'utf-8')

# 2. 이메일 헤더 설정
msg['From'] = Header(sender_email, 'utf-8') # 발신자
msg['To'] = Header(receiver_email, 'utf-8') # 수신자
msg['Subject'] = Header(email_subject, 'utf-8') # 제목

try:
    # 3. SMTP 서버에 연결 및 로그인
    # SSL/TLS 사용 시: smtplib.SMTP_SSL(smtp_server, smtp_port)
    # TLS 사용 시: smtplib.SMTP(smtp_server, smtp_port); server.starttls()
    with smtplib.SMTP_SSL(smtp_server, smtp_port) as server:
        server.login(sender_email, sender_password) # 로그인
        print("SMTP 서버 로그인 성공.")

        # 4. 이메일 발송
        # sendmail(from_addr, to_addrs, msg.as_string())
        # to_addrs는 리스트 형태여야 함: [receiver_email]
        server.sendmail(sender_email, [receiver_email], msg.as_string())
        print(f"이메일을 {receiver_email}에게 성공적으로 보냈습니다.")

except smtplib.SMTPAuthenticationError:
    print("SMTP 인증 오류: 이메일 주소 또는 비밀번호가 올바르지 않습니다.")
    print("Gmail의 경우 '앱 비밀번호'를 사용해야 할 수 있습니다.")
except smtplib.SMTPConnectError:
    print(f"SMTP 서버 연결 오류: {smtp_server}:{smtp_port}")
except Exception as e:
    print(f"이메일 발송 중 오류 발생: {e}")

Exchange 이메일 사용

Microsoft Exchange 서버를 사용하는 경우, exchangelib 라이브러리를 사용하여 기업 이메일에 연결하고 이메일을 보낼 수 있습니다.

설치: pip install exchangelib


from exchangelib import Account, Credentials, Message, HTMLBody, Mailbox, FileAttachment
import os

# --- Exchange 계정 정보 ---
# 실제 Exchange 서비스 주소, 사용자 이름, 비밀번호로 변경해야 합니다.
# Autodiscover를 사용하지 않는 경우, explicit_server_name 사용
EXCHANGE_EMAIL = "your_exchange_user@yourcompany.com"
EXCHANGE_PASSWORD = "your_password"
# EXCHANGED_SERVER = "outlook.office365.com" # 또는 회사 Exchange 서버 주소

# --- 메시지 정보 ---
recipient_email = "recipient@example.com"
email_subject = "Exchange Test Email with Attachment"
html_content = "<h1>Hello from ExchangeLib!</h1><p>This is an HTML email with an attachment.</p>"
attachment_path = "path/to/your/attachment.pdf" # 첨부할 파일 경로

try:
    # 계정 설정
    credentials = Credentials(username=EXCHANGE_EMAIL, password=EXCHANGE_PASSWORD)
    # autodiscover=True를 사용하면 서버 주소를 자동으로 찾습니다.
    # 특정 서버를 지정하려면 explicit_server_name=EXCHANGED_SERVER 를 추가합니다.
    account = Account(primary_smtp_address=EXCHANGE_EMAIL,
                      credentials=credentials,
                      autodiscover=True,
                      access_type='delegate') # 또는 'primary'

    # 이메일 메시지 생성
    email_message = Message(
        account=account,
        subject=email_subject,
        body=HTMLBody(html_content), # HTML 형식 본문
        to_recipients=[Mailbox(email_address=recipient_email)] # 수신자 리스트
    )

    # 첨부 파일 추가 (파일이 존재하는지 확인)
    if os.path.exists(attachment_path):
        with open(attachment_path, 'rb') as f:
            file_content = f.read()
        file_attachment = FileAttachment(name=os.path.basename(attachment_path), content=file_content)
        email_message.attach(file_attachment)
        print(f"첨부 파일 '{os.path.basename(attachment_path)}' 추가됨.")
    else:
        print(f"경고: 첨부 파일 '{attachment_path}'를 찾을 수 없습니다. 첨부 없이 메일 전송 시도.")

    # 이메일 발송
    email_message.send()
    print(f"Exchange 이메일을 '{recipient_email}'에게 성공적으로 보냈습니다.")

except Exception as e:
    print(f"Exchange 이메일 발송 중 오류 발생: {e}")

Python SSH/SFTP 프로토콜

Python에서는 paramiko 라이브러리를 사용하여 SSH 프로토콜을 구현할 수 있습니다.

설치: pip install paramiko


import paramiko
import os

# SSH 서버 접속 정보
SSH_HOST = "your_ssh_server.com" # 실제 SSH 서버 주소
SSH_PORT = 22
SSH_USERNAME = "your_ssh_username"
SSH_PASSWORD = "your_ssh_password" # 또는 SSH 키 사용

# 로컬 및 원격 파일 경로
local_file_to_upload = "local_upload.txt"
remote_upload_path = "/home/user/remote_upload.txt" # 원격 경로
remote_file_to_download = "/home/user/remote_download.txt" # 원격 파일
local_download_path = "local_download.txt"

# 예시를 위한 로컬 파일 생성
with open(local_file_to_upload, "w") as f:
    f.write("This is a test file for uploading via SFTP.\n")

ssh_client = None
sftp_client = None

try:
    # SSH 클라이언트 생성
    ssh_client = paramiko.SSHClient()
    # 자동 호스트 키 추가 정책 설정 (보안상 주의 필요, 프로덕션 환경에서는 신중하게 사용)
    ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

    # SSH 서버에 연결
    print(f"SSH 서버 {SSH_HOST}:{SSH_PORT}에 연결 중...")
    ssh_client.connect(SSH_HOST, port=SSH_PORT, username=SSH_USERNAME, password=SSH_PASSWORD)
    print("SSH 연결 성공.")

    # SFTP 클라이언트 열기
    sftp_client = ssh_client.open_sftp()
    print("SFTP 클라이언트 열기 성공.")

    # 1. 원격 파일 업로드
    print(f"파일 업로드 중: '{local_file_to_upload}' -> '{remote_upload_path}'")
    sftp_client.put(local_file_to_upload, remote_upload_path)
    print("파일 업로드 완료.")

    # 2. 원격 파일 다운로드
    print(f"파일 다운로드 중: '{remote_file_to_download}' -> '{local_download_path}'")
    sftp_client.get(remote_file_to_download, local_download_path)
    print("파일 다운로드 완료.")

    # 3. 원격 파일 목록 보기 (예: /home/user 디렉토리)
    print("\n원격 디렉토리 '/home/user'의 파일 목록:")
    for entry in sftp_client.listdir('/home/user'):
        print(f"- {entry}")

    # 4. 원격 명령 실행 (예: ls -l)
    print("\n원격 명령 실행 (ls -l):")
    # exec_command는 stdin, stdout, stderr를 반환
    stdin, stdout, stderr = ssh_client.exec_command("ls -l /home/user")
    # stdout.read()는 바이트를 반환하므로 decode() 필요
    command_output = stdout.read().decode('utf-8')
    command_error = stderr.read().decode('utf-8')
    if command_output:
        print(command_output)
    if command_error:
        print(f"명령 실행 오류: {command_error}")

except paramiko.AuthenticationException:
    print("SSH 인증 실패. 사용자 이름 또는 비밀번호(키)를 확인하세요.")
except paramiko.SSHException as sshException:
    print(f"SSH 연결 중 오류 발생: {sshException}")
except FileNotFoundError:
    print("오류: 지정된 로컬 또는 원격 파일을 찾을 수 없습니다.")
except Exception as e:
    print(f"일반 오류 발생: {e}")
finally:
    # SFTP 및 SSH 연결 종료
    if sftp_client:
        sftp_client.close()
        print("SFTP 클라이언트 종료.")
    if ssh_client:
        ssh_client.close()
        print("SSH 연결 종료.")
    # 예시로 생성한 로컬 파일 삭제
    if os.path.exists(local_file_to_upload):
        os.remove(local_file_to_upload)
    if os.path.exists(local_download_path):
        os.remove(local_download_path)

태그: python concurrency Multithreading multiprocessing asyncio

6월 19일 17:47에 게시됨