멀티프로세싱이 필요한 이유
현대 애플리케이션은 점점 더 많은 계산 리소스를 필요로 하며, 단일 스레드로는 한계에 부딪히기 쉽습니다. Python에서는 멀티프로세싱을 통해 CPU 집약적인 작업을 병렬화하고, 다중 코어를 효율적으로 활용할 수 있습니다. 주요 장점은 다음과 같습니다:
- 다중 코어 활용: 각 프로세스가 독립된 메모리 공간과 CPU 코어를 사용하여 전체 처리 속도 향상.
- CPU 집약 작업 최적화: 이미지 처리, 수치 계산, 데이터 변환 등 무거운 연산을 분산 처리 가능.
- 결함 격리: 하나의 프로세스가 충돌해도 다른 프로세스는 영향을 받지 않아 시스템 안정성 향상.
- 진정한 병렬성: GIL(Global Interpreter Lock)의 제약을 피하기 위해 스레드 대신 프로세스를 사용하는 것이 유리함.
주요 모듈 개요
Python은 멀티프로세싱을 지원하는 두 가지 핵심 모듈을 제공합니다:
multiprocessing: 저수준 인터페이스로 프로세스 생성, 통신, 동기화 등을 세밀하게 제어 가능.concurrent.futures: 고수준 추상화를 제공하여 간편한 태스크 디스패치와 결과 수집이 가능.
기본 프로세스 생성 및 관리
multiprocessing.Process 클래스를 사용하면 개별 작업을 별도의 프로세스로 실행할 수 있습니다.
import multiprocessing
def task_handler(identifier):
print(f"작업자 {identifier}가 작업 중입니다.")
if __name__ == "__main__":
workers = []
for idx in range(4):
proc = multiprocessing.Process(target=task_handler, args=(idx,))
workers.append(proc)
proc.start()
for worker in workers:
worker.join()
print("모든 작업 완료")
위 코드는 4개의 자식 프로세스를 생성하고, 각각이 독립적으로 함수를 실행하도록 합니다. join()을 통해 부모 프로세스가 모든 자식 종료를 기다립니다.
고수준 실행기 사용법
ProcessPoolExecutor는 반복적인 작업을 자동으로 프로세스 풀에 할당해주므로 관리가 간편합니다.
import concurrent.futures
def task_handler(identifier):
return f"작업자 {identifier} 완료"
if __name__ == "__main__":
with concurrent.futures.ProcessPoolExecutor() as pool:
outcomes = pool.map(task_handler, range(4))
for outcome in outcomes:
print(outcome)
print("전체 작업 종료")
이 방식은 내부적으로 프로세스 재사용과 오버헤드 최소화를 수행하므로 짧은 작업에도 적합합니다.
프로세스 간 데이터 교환 방법
프로세스는 서로 메모리를 공유하지 않으므로 명시적인 통신 수단이 필요합니다. 주요 방법은 다음과 같습니다:
1. 큐(Queue)를 통한 전달
FIFO 기반의 비동기 통신에 적합합니다.
from multiprocessing import Process, Queue
def producer(q):
for i in range(3):
q.put(f"데이터_{i}")
def consumer(q):
while not q.empty():
print("받은 데이터:", q.get())
if __name__ == "__main__":
queue = Queue()
p1 = Process(target=producer, args=(queue,))
p2 = Process(target=consumer, args=(queue,))
p1.start(); p1.join()
p2.start(); p2.join()
2. 파이프(Pipe)를 이용한 양방향 통신
두 프로세스 사이의 직접 연결이 필요할 때 유용합니다.
from multiprocessing import Process, Pipe
def sender(conn):
conn.send("메시지 전송됨")
conn.close()
def receiver(conn):
msg = conn.recv()
print("수신:", msg)
conn.close()
if __name__ == "__main__":
parent_conn, child_conn = Pipe()
p1 = Process(target=sender, args=(child_conn,))
p2 = Process(target=receiver, args=(parent_conn,))
p1.start(); p2.start()
p1.join(); p2.join()
3. 공유 메모리
공통 변수를 읽고 쓰는 경우 Value 또는 Array를 사용합니다.
from multiprocessing import Process, Value
def increment(shared_counter):
for _ in range(10000):
with shared_counter.get_lock():
shared_counter.value += 1
if __name__ == "__main__":
counter = Value('i', 0) # 정수형 값
processes = [Process(target=increment, args=(counter,)) for _ in range(4)]
for p in processes:
p.start()
for p in processes:
p.join()
print("최종 카운터 값:", counter.value)
실제 활용 사례
예제 1: 병렬 수치 연산
대량의 숫자를 제곱하는 작업을 분산 처리합니다.
import concurrent.futures
def compute_square(n):
return n * n
data = [7, 9, 12, 15, 20]
with concurrent.futures.ProcessPoolExecutor() as executor:
results = list(executor.map(compute_square, data))
print("결과:", results)
예제 2: 웹 요청 병렬 처리
여러 URL에 동시에 HTTP 요청을 보내 응답 시간을 단축합니다.
import requests
import concurrent.futures
def fetch_status(url):
try:
resp = requests.get(url, timeout=5)
return f"{url} → 상태 {resp.status_code}"
except Exception as e:
return f"{url} → 오류 발생"
urls = ["https://httpbin.org/delay/1", "https://httpbin.org/status/200"]
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
responses = executor.map(fetch_status, urls)
for res in responses:
print(res)
참고: 네트워크 I/O 작업에는ThreadPoolExecutor가 더 효율적일 수 있습니다. CPU 바운드 작업에는ProcessPoolExecutor를 권장합니다.
예제 3: 대량 파일 변환
텍스트 파일들을 일괄적으로 대문자로 변환합니다.
import os
from multiprocessing import Pool
def uppercase_file(filepath):
try:
with open(filepath, 'r') as f:
content = f.read()
with open(filepath, 'w') as f:
f.write(content.upper())
return f"처리됨: {filepath}"
except Exception as e:
return f"실패: {filepath} - {e}"
if __name__ == "__main__":
target_dir = "./texts"
file_list = [os.path.join(target_dir, f) for f in os.listdir(target_dir)]
with Pool(processes=4) as pool:
results = pool.map(uppercase_file, file_list)
for r in results:
print(r)
주의사항 및 팁
- GIL 우회: CPU 집약 작업은 반드시
multiprocessing을 사용하여 GIL 영향을 피해야 함. - 직렬화 제약: 프로세스 간 전달되는 객체는 pickle 가능해야 하며, 람다나 로컬 함수는 전달 불가.
- 리소스 제한: 너무 많은 프로세스 생성은 컨텍스트 스위칭 오버헤드 증가로 이어질 수 있음.
- 윈도우 호환성: Windows에서는
if __name__ == "__main__"보호가 필수이며, 그렇지 않으면 무한 재귀 발생 가능. - I/O vs CPU 작업 구분: I/O 중심 작업은 스레딩이 더 효율적일 수 있음.