Python asyncio: 비동기 환경에서 블로킹 작업 실행

비동기 프로그래밍 환경, 특히 asyncio를 사용할 때 블로킹(blocking) 작업은 주의해야 할 대상입니다. 이벤트 루프가 하나의 스레드에서 실행되는 asyncio에서는 블로킹 작업이 발생하면 해당 스레드가 멈추어 다른 모든 비동기 작업(코루틴)의 실행을 방해하게 됩니다.

이는 CPU 집약적인 계산, 파일 입출력과 같이 시간이 오래 걸리는 작업, 또는 asyncio를 지원하지 않는 외부 라이브러리를 호출할 때 발생할 수 있습니다. 이러한 블로킹 작업을 asyncio 프로그램 내에서 비동기적으로 처리하는 방법을 알아보겠습니다.

비동기 방식으로 블로킹 작업 처리하기

asyncio는 두 가지 주요 방법을 제공하여 블로킹 작업을 비동기적으로 실행할 수 있도록 지원합니다.

1. asyncio.to_thread() 사용

asyncio.to_thread()는 고수준 API로, 사용자가 직접 스레드 풀을 관리할 필요 없이 블로킹 함수를 별도의 스레드에서 실행하도록 간편하게 지원합니다. 이 함수는 실행할 함수와 해당 함수의 인자를 전달받아, 해당 함수를 별도의 스레드에서 실행하는 코루틴을 반환합니다. 반환된 코루틴은 await 키워드를 사용하여 기다리거나, asyncio.create_task()를 통해 작업으로 생성하여 예약할 수 있습니다.


# 별도의 스레드에서 함수 실행
await asyncio.to_thread(blocking_function, arg1, arg2)

asyncio.to_thread()는 내부적으로 ThreadPoolExecutor를 사용하여 작업을 처리하므로, I/O 바운드 작업에 주로 적합합니다. 작업이 시작되기까지는 반환된 코루틴이 이벤트 루프에서 실행될 기회를 얻기 전까지 대기합니다.

2. loop.run_in_executor() 사용

loop.run_in_executor()는 저수준 API로, 먼저 현재 실행 중인 이벤트 루프를 가져와야 합니다. asyncio.get_running_loop() 함수를 통해 이벤트 루프 객체를 얻을 수 있습니다.

이 함수는 실행자(executor)와 실행할 함수를 인자로 받습니다. 실행자를 지정하지 않으면 기본적으로 ThreadPoolExecutor가 사용됩니다. 이 함수는 실행이 가능한 객체(awaitable)를 반환하며, 이를 await하여 작업 완료를 기다릴 수 있습니다. asyncio.to_thread()와 달리, 이 함수는 즉시 작업을 시작하므로 반환된 객체를 기다릴 필요 없이 작업이 바로 시작됩니다.


# 이벤트 루프 가져오기
loop = asyncio.get_running_loop()
# 기본 실행자(ThreadPoolExecutor)를 사용하여 함수 실행
await loop.run_in_executor(None, blocking_function, arg1, arg2)

특정 실행자(예: ProcessPoolExecutor)를 직접 생성하여 전달할 수도 있습니다. 이 경우, 사용자는 생성한 실행자를 직접 관리하고 작업 완료 후 적절히 종료해야 합니다.


from concurrent.futures import ProcessPoolExecutor

# 프로세스 풀 생성
with ProcessPoolExecutor() as executor:
    # 이벤트 루프 가져오기
    loop = asyncio.get_running_loop()
    # 지정된 실행자(ProcessPoolExecutor)를 사용하여 함수 실행
    await loop.run_in_executor(executor, blocking_function, arg1, arg2)
    # 프로세스 풀은 자동으로 종료됩니다.

이 두 가지 방법 모두 asyncio 프로그램 내에서 블로킹 작업을 별도의 스레드나 프로세스에서 비동기적으로 실행할 수 있게 해줍니다.

실행 예제

asyncio.to_thread()를 사용하여 I/O 바운드 블로킹 작업을 비동기적으로 처리하는 예제를 살펴보겠습니다. 이 예제에서는 몇 초간 실행을 지연시키는 함수를 정의하고, 이를 asyncio.to_thread()를 통해 asyncio의 스레드 풀에서 비동기적으로 실행합니다.


import asyncio
import time

# 블로킹 작업을 수행하는 함수
def simulate_io_operation():
    print("I/O 작업 시작")
    time.sleep(3)  # 3초간 실행을 멈춤 (I/O 대기 시뮬레이션)
    print("I/O 작업 완료")

# 메인 코루틴
async def main_process():
    print("메인 프로세스: 블로킹 작업 시작 준비")
    # 블로킹 함수를 별도 스레드에서 실행하도록 코루틴 생성
    io_coroutine = asyncio.to_thread(simulate_io_operation)
    # 생성된 코루틴을 작업으로 등록
    io_task = asyncio.create_task(io_coroutine)

    print("메인 프로세스: 다른 작업 수행 중...")
    # 다른 작업이 진행될 수 있도록 잠시 대기 (이벤트 루프 양보)
    await asyncio.sleep(0.1) 

    print("메인 프로세스: 블로킹 작업 완료 대기")
    # 블로킹 작업이 완료될 때까지 기다림
    await io_task
    print("메인 프로세스: 모든 작업 완료")

# asyncio 프로그램 실행
if __name__ == "__main__":
    asyncio.run(main_process())

예상 출력


메인 프로세스: 블로킹 작업 시작 준비
메인 프로세스: 다른 작업 수행 중...
I/O 작업 시작
메인 프로세스: 블로킹 작업 완료 대기
I/O 작업 완료
메인 프로세스: 모든 작업 완료

위 예제는 main_process() 코루틴을 시작으로 실행됩니다. 메인 코루틴은 메시지를 출력하고, asyncio.to_thread()를 통해 simulate_io_operation() 함수를 백그라운드 스레드에서 실행하도록 예약합니다. 메인 코루틴은 다른 작업을 수행할 수 있으며, 이 예제에서는 asyncio.sleep(0.1)을 통해 예약된 작업이 시작될 시간을 줍니다.

백그라운드 스레드에서 simulate_io_operation() 함수가 실행되어 메시지를 출력하고 3초간 대기한 후, 완료 메시지를 출력합니다. 메인 코루틴은 await io_task를 통해 예약된 작업이 완료될 때까지 기다렸다가, 모든 작업이 완료되었음을 알리는 메시지를 출력하고 종료됩니다. 이 과정을 통해 메인 스레드는 블로킹 작업 중에 멈추지 않고 다른 비동기 작업을 처리할 수 있습니다.

태그: asyncio python threading concurrency async

5월 21일 07:21에 게시됨