파이썬의 재귀 락, 세마포어, 이벤트, 스레드 큐, 프로세스 및 스레드 풀, 콜백 함수, 타이머

1. 스레드의 데드락과 재귀 락 (RLock)

데드락은 두 개 이상의 스레드가 서로 자원을 기다리며 진행이 멈추는 현상이다. 예를 들어, 스레드 A가 자원 1을 점유하고 자원 2를 요청하고, 동시에 스레드 B가 자원 2를 점유하고 자원 1을 요청하면, 둘 다 대기 상태에 빠져 프로그램이 정지된다.

이 문제를 해결하기 위해 파이썬에서는 RLock(Reentrant Lock)을 제공한다. 이는 동일한 스레드에서 여러 번 acquire() 호출이 가능하도록 하며, 내부 카운터를 통해 획득 횟수를 관리한다. 모든 획득이 해제될 때까지 다른 스레드는 해당 자원에 접근할 수 없다.

from threading import Thread, RLock
import time

lock = RLock()

class Worker(Thread):
    def run(self):
        self.step_one()
        self.step_two()

    def step_one(self):
        lock.acquire()
        print(f"{self.name} - 자원 1 획득")
        time.sleep(0.5)
        lock.acquire()  # 재진입 가능
        print(f"{self.name} - 자원 2 획득")
        lock.release()
        lock.release()

    def step_two(self):
        lock.acquire()
        print(f"{self.name} - 자원 2 재획득")
        lock.acquire()
        print(f"{self.name} - 자원 1 재획득")
        lock.release()
        lock.release()

if __name__ == "__main__":
    for i in range(5):
        t = Worker()
        t.start()

2. 세마포어 (Semaphore)

세마포어는 동시 실행 가능한 스레드 수를 제한하는 도구이다. 내부 카운터를 사용하여 acquire() 시 감소, release() 시 증가하며, 카운터가 0이 되면 스레드는 대기 상태가 된다.

예를 들어, 웹 크롤링 시 최대 3개의 스레드만 동시에 작업하도록 제한할 수 있다.

import threading
import time

semaphore = threading.Semaphore(3)

class Spider(threading.Thread):
    def __init__(self, url):
        super().__init__()
        self.url = url

    def run(self):
        semaphore.acquire()
        print(f"크롤링 시작: {self.url}")
        time.sleep(2)
        print(f"크롤링 완료: {self.url}")
        semaphore.release()

class UrlProducer(threading.Thread):
    def run(self):
        for i in range(10):
            spider = Spider(f"http://example.com/page{i}")
            spider.start()

if __name__ == "__main__":
    producer = UrlProducer()
    producer.start()

3. 이벤트 (Event)

Event 객체는 스레드 간 통신을 위한 신호 전달 메커니즘이다. 하나의 스레드가 set()로 상태를 변경하면, 다른 스레드는 wait()를 통해 그 신호를 감지하고 다음 작업을 수행할 수 있다.

from threading import Thread, Event
import time

event = Event()

def traffic_light():
    print("빨간불 켜짐")
    time.sleep(3)
    event.set()  # 초록불으로 전환

def car(name):
    print(f"차 {name} 대기 중...")
    event.wait()  # 초록불이 될 때까지 대기
    print(f"차 {name} 통과")

if __name__ == "__main__":
    light_thread = Thread(target=traffic_light)
    light_thread.start()

    for i in range(5):
        car_thread = Thread(target=car, args=(i,))
        car_thread.start()

4. 스레드 큐 (Queue)

파이썬의 queue 모듈은 스레드 안전한 큐를 제공한다. 주요 유형은 다음과 같다:

  • FIFO: 먼저 들어온 것이 먼저 나감 (기본 큐)
  • LifoQueue: 마지막에 들어온 것이 먼저 나감 (후입선출)
  • PriorityQueue: 우선순위가 높은 항목부터 처리됨
import queue
import threading

q = queue.Queue(maxsize=5)

def producer():
    for i in range(10):
        q.put(i)
        print(f"값 {i} 큐에 저장됨")
    q.join()  # 모든 작업 완료 대기
    print("생산자 완료")

def consumer():
    while True:
        try:
            item = q.get(timeout=1)
            print(f"값 {item} 꺼냄")
            q.task_done()
        except queue.Empty:
            break

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)

t1.start()
t2.start()

t1.join()
t2.join()

5. 프로세스 풀과 스레드 풀

concurrent.futures 모듈은 ProcessPoolExecutorThreadPoolExecutor를 제공하여 비동기 작업을 효율적으로 관리한다.

from concurrent.futures import ProcessPoolExecutor
import os
import time

def worker(name):
    print(f"작업자 {name} (PID: {os.getpid()}) 시작")
    time.sleep(2)
    return f"완료: {name}"

if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(worker, f"작업_{i}") for i in range(8)]
        results = [f.result() for f in futures]
        for res in results:
            print(res)
    print("메인 종료")

6. 콜백 함수 (Callback)

비동기 작업 후 결과를 처리하기 위해 add_done_callback()를 사용할 수 있다. 이 함수는 작업 완료 시 자동으로 호출되며, 결과 값을 인자로 받는다.

from concurrent.futures import ThreadPoolExecutor
import requests

def fetch_page(url):
    response = requests.get(url)
    return {"url": url, "length": len(response.text)}

def on_complete(future):
    result = future.result()
    print(f"페이지 길이: {result['url']} → {result['length']}자")

if __name__ == "__main__":
    urls = ["https://httpbin.org/delay/1", "https://httpbin.org/json"]
    with ThreadPoolExecutor(max_workers=2) as executor:
        for url in urls:
            future = executor.submit(fetch_page, url)
            future.add_done_callback(on_complete)

7. 타이머 (Timer)

Timer는 지정된 시간 이후 특정 함수를 실행하는 스레드 기반 클래스이다. cancel() 메서드로 취소 가능하다.

from threading import Timer

def alarm(message):
    print(message)

if __name__ == "__main__":
    timer = Timer(3.0, alarm, args=["알람!"])
    timer.start()
    time.sleep(1)
    timer.cancel()  # 타이머 취소

태그: python threading multiprocessing RLock Semaphore

6월 1일 23:56에 게시됨