다중 프로세스의 필요성과 multiprocessing 모듈
Python의 GIL(Global Interpreter Lock)로 인해 멀티스레딩은 CPU 집약적 작업에서 멀티코어를 효과적으로 활용할 수 없습니다. 이를 극복하기 위해 multiprocessing 모듈을 사용하면 각 프로세스가 독립된 메모리 공간과 Python 인터프리터를 가지므로, CPU 자원을 최대한 활용할 수 있습니다. 이 모듈은 프로세스 생성, 데이터 공유, 동기화 등을 위한 다양한 컴포넌트를 제공합니다.
Process 클래스: 프로세스 생성의 핵심
모든 프로세스는 Process 클래스의 인스턴스입니다. 이 클래스를 통해 하위 프로세스를 정의하고 제어할 수 있습니다.
주요 매개변수
- target: 새 프로세스가 실행할 함수.
- args:
target함수에 전달할 위치 인수 튜플 (단일 인수 후에는 쉼표 필수). - kwargs:
target함수에 전달할 키워드 인수 딕셔너리. - name: 프로세스의 이름.
주요 메서드와 속성
<span></span># 프로세스 시작
process.start()
# 부모 프로세스가 자식 프로세스 종료를 대기
process.join()
# 프로세스 강제 종료
process.terminate()
# 프로세스 활성 상태 확인
process.is_alive()
# 프로세스 ID 및 이름 조회
print(process.pid)
print(process.name)
Windows 환경에서의 주의사항
Unix 계열 시스템은 fork()를 사용하여 프로세스를 생성하지만, Windows는 spawn 방식을 사용합니다. 이는 새로운 Python 인터프리터를 시작하고 모듈을 다시 임포트하는 것을 의미합니다. 따라서 아래 코드 블록 없이 Process() 객체를 생성하면 무한 재귀 생성이 발생할 수 있습니다.
<span></span>if __name__ == '__main__':
p = Process(target=some_function)
p.start()
Process 생성 방법
두 가지 방법으로 프로세스를 생성할 수 있습니다.
방법 1: 함수 기반
<span></span>from multiprocessing import Process
import time
def task(name):
print(f"{name} 작업 시작")
time.sleep(2)
print(f"{name} 작업 완료")
if __name__ == '__main__':
p = Process(target=task, args=('작업자',))
p.start()
p.join()
방법 2: 클래스 상속 기반
<span></span>from multiprocessing import Process
import time
class Worker(Process):
def __init__(self, name):
super().__init__()
self.worker_name = name
def run(self):
print(f"{self.worker_name} 작업 시작")
time.sleep(2)
print(f"{self.worker_name} 작업 완료")
if __name__ == '__main__':
w = Worker('클래스 기반 작업자')
w.start()
w.join()
메모리 격리와 join() 메서드의 진정한 의미
각 프로세스는 고유한 메모리 공간을 가지므로, 변수 변경 사항은 다른 프로세스에 반영되지 않습니다.
<span></span>from multiprocessing import Process
counter = 0
def modify_counter():
global counter
counter = 100
print(f"자식 프로세스 내 counter: {counter}")
if __name__ == '__main__':
p = Process(target=modify_counter)
p.start()
p.join() # 부모가 여기서 대기
print(f"부모 프로세스 내 counter: {counter}") # 여전히 0 출력
join()은 자식 프로세스가 종료될 때까지 부모 프로세스의 실행을 일시 중지시킵니다. 이는 프로그램 흐름을 동기화하는 데 매우 중요합니다.
데몬 프로세스
데몬 프로세스는 부모 프로세스가 종료되면 함께 종료됩니다. 서버나 백그라운드 작업에 유용합니다.
<span></span>def background_task():
while True:
print("데몬 프로세스 실행 중...")
time.sleep(1)
if __name__ == '__main__':
d = Process(target=background_task)
d.daemon = True # 반드시 start() 전에 설정
d.start()
print("메인 프로세스 종료")
# d.join() 없음, 메인 프로세스가 종료되며 d도 종료됨
공유 리소스 보호를 위한 락(Lock)
프로세스는 메모리를 공유하지 않지만, 파일이나 표준 출력 같은 시스템 리소스는 공유하므로 경쟁 조건이 발생할 수 있습니다. Lock을 사용해 이를 방지합니다.
<span></span>from multiprocessing import Process, Lock
def safe_print(message, lock):
with lock: # 크리티컬 섹션 시작
print(message)
print("-------------------------") # 여러 줄 출력을 하나의 원자적 작업처럼
# 크리티컬 섹션 종료
if __name__ == '__main__':
lock = Lock()
processes = []
for i in range(3):
p = Process(target=safe_print, args=(f"프로세스 {i}", lock))
p.start()
processes.append(p)
for p in processes:
p.join()
메시지 기반 통신: Queue와 JoinableQueue
데이터 공유보다 메시지 전달이 더 안전하고 확장성이 좋습니다. Queue는 스레드/프로세스 간 안전한 데이터 교환을 제공하며, 내부적으로 락을 처리합니다.
기본 Queue 사용 예제
<span></span>from multiprocessing import Process, Queue
def producer(q):
for item in ['사과', '바나나', '체리']:
q.put(item)
print(f"생산: {item}")
q.put(None) # 종료 신호
def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f"소비: {item}")
if __name__ == '__main__':
queue = Queue()
p = Process(target=producer, args=(queue,))
c = Process(target=consumer, args=(queue,))
p.start(); c.start()
p.join(); c.join()
JoinableQueue로 작업 완료 추적
여러 생산자가 있을 때 모든 작업이 완료되었음을 알기 위해 JoinableQueue를 사용합니다.
<span></span>from multiprocessing import Process, JoinableQueue
def producer_advanced(q, name):
for i in range(2):
item = f"{name}-{i}"
q.put(item)
print(f"생산자 {name}: {item}")
q.join() # 모든 항목이 소비될 때까지 대기
def consumer_advanced(q):
while True:
item = q.get()
print(f"소비자: {item} 처리 중")
time.sleep(1)
q.task_done() # 작업 완료 신호
if __name__ == '__main__':
jq = JoinableQueue()
producers = [Process(target=producer_advanced, args=(jq, f"P{i}")) for i in range(2)]
consumer_proc = Process(target=consumer_advanced, args=(jq,))
consumer_proc.daemon = True # 소비자는 데몬으로 설정
for p in producers:
p.start()
consumer_proc.start()
for p in producers:
p.join()
print("모든 생산자 종료")
고급 도구: Pool
많은 수의 짧은 작업을 처리할 때 Pool을 사용하면 효율적입니다. 프로세스 풀을 관리하여 오버헤드를 줄이고, 작업을 쉽게 분배할 수 있습니다.
<span></span>from multiprocessing import Pool
import os
def cpu_intensive_task(n):
sum(i * i for i in range(n))
return os.getpid()
if __name__ == '__main__':
with Pool() as pool: # CPU 코어 수만큼 프로세스 생성
results = pool.map(cpu_intensive_task, [10000, 20000, 30000])
print(results)