1. 스레드의 데드락과 재귀 락 (RLock)
데드락은 두 개 이상의 스레드가 서로 자원을 기다리며 진행이 멈추는 현상이다. 예를 들어, 스레드 A가 자원 1을 점유하고 자원 2를 요청하고, 동시에 스레드 B가 자원 2를 점유하고 자원 1을 요청하면, 둘 다 대기 상태에 빠져 프로그램이 정지된다.
이 문제를 해결하기 위해 파이썬에서는 RLock(Reentrant Lock)을 제공한다. 이는 동일한 스레드에서 여러 번 acquire() 호출이 가능하도록 하며, 내부 카운터를 통해 획득 횟수를 관리한다. 모든 획득이 해제될 때까지 다른 스레드는 해당 자원에 접근할 수 없다.
from threading import Thread, RLock
import time
lock = RLock()
class Worker(Thread):
def run(self):
self.step_one()
self.step_two()
def step_one(self):
lock.acquire()
print(f"{self.name} - 자원 1 획득")
time.sleep(0.5)
lock.acquire() # 재진입 가능
print(f"{self.name} - 자원 2 획득")
lock.release()
lock.release()
def step_two(self):
lock.acquire()
print(f"{self.name} - 자원 2 재획득")
lock.acquire()
print(f"{self.name} - 자원 1 재획득")
lock.release()
lock.release()
if __name__ == "__main__":
for i in range(5):
t = Worker()
t.start()
2. 세마포어 (Semaphore)
세마포어는 동시 실행 가능한 스레드 수를 제한하는 도구이다. 내부 카운터를 사용하여 acquire() 시 감소, release() 시 증가하며, 카운터가 0이 되면 스레드는 대기 상태가 된다.
예를 들어, 웹 크롤링 시 최대 3개의 스레드만 동시에 작업하도록 제한할 수 있다.
import threading
import time
semaphore = threading.Semaphore(3)
class Spider(threading.Thread):
def __init__(self, url):
super().__init__()
self.url = url
def run(self):
semaphore.acquire()
print(f"크롤링 시작: {self.url}")
time.sleep(2)
print(f"크롤링 완료: {self.url}")
semaphore.release()
class UrlProducer(threading.Thread):
def run(self):
for i in range(10):
spider = Spider(f"http://example.com/page{i}")
spider.start()
if __name__ == "__main__":
producer = UrlProducer()
producer.start()
3. 이벤트 (Event)
Event 객체는 스레드 간 통신을 위한 신호 전달 메커니즘이다. 하나의 스레드가 set()로 상태를 변경하면, 다른 스레드는 wait()를 통해 그 신호를 감지하고 다음 작업을 수행할 수 있다.
from threading import Thread, Event
import time
event = Event()
def traffic_light():
print("빨간불 켜짐")
time.sleep(3)
event.set() # 초록불으로 전환
def car(name):
print(f"차 {name} 대기 중...")
event.wait() # 초록불이 될 때까지 대기
print(f"차 {name} 통과")
if __name__ == "__main__":
light_thread = Thread(target=traffic_light)
light_thread.start()
for i in range(5):
car_thread = Thread(target=car, args=(i,))
car_thread.start()
4. 스레드 큐 (Queue)
파이썬의 queue 모듈은 스레드 안전한 큐를 제공한다. 주요 유형은 다음과 같다:
- FIFO: 먼저 들어온 것이 먼저 나감 (기본 큐)
- LifoQueue: 마지막에 들어온 것이 먼저 나감 (후입선출)
- PriorityQueue: 우선순위가 높은 항목부터 처리됨
import queue
import threading
q = queue.Queue(maxsize=5)
def producer():
for i in range(10):
q.put(i)
print(f"값 {i} 큐에 저장됨")
q.join() # 모든 작업 완료 대기
print("생산자 완료")
def consumer():
while True:
try:
item = q.get(timeout=1)
print(f"값 {item} 꺼냄")
q.task_done()
except queue.Empty:
break
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()
5. 프로세스 풀과 스레드 풀
concurrent.futures 모듈은 ProcessPoolExecutor와 ThreadPoolExecutor를 제공하여 비동기 작업을 효율적으로 관리한다.
from concurrent.futures import ProcessPoolExecutor
import os
import time
def worker(name):
print(f"작업자 {name} (PID: {os.getpid()}) 시작")
time.sleep(2)
return f"완료: {name}"
if __name__ == "__main__":
with ProcessPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(worker, f"작업_{i}") for i in range(8)]
results = [f.result() for f in futures]
for res in results:
print(res)
print("메인 종료")
6. 콜백 함수 (Callback)
비동기 작업 후 결과를 처리하기 위해 add_done_callback()를 사용할 수 있다. 이 함수는 작업 완료 시 자동으로 호출되며, 결과 값을 인자로 받는다.
from concurrent.futures import ThreadPoolExecutor
import requests
def fetch_page(url):
response = requests.get(url)
return {"url": url, "length": len(response.text)}
def on_complete(future):
result = future.result()
print(f"페이지 길이: {result['url']} → {result['length']}자")
if __name__ == "__main__":
urls = ["https://httpbin.org/delay/1", "https://httpbin.org/json"]
with ThreadPoolExecutor(max_workers=2) as executor:
for url in urls:
future = executor.submit(fetch_page, url)
future.add_done_callback(on_complete)
7. 타이머 (Timer)
Timer는 지정된 시간 이후 특정 함수를 실행하는 스레드 기반 클래스이다. cancel() 메서드로 취소 가능하다.
from threading import Timer
def alarm(message):
print(message)
if __name__ == "__main__":
timer = Timer(3.0, alarm, args=["알람!"])
timer.start()
time.sleep(1)
timer.cancel() # 타이머 취소