1. 동시성 프로그래밍과 코루틴(Coroutine)의 이해
1.1 코루틴의 정의와 위치
운영체제 관점에서 프로세스는 실행 중인 프로그램의 인스턴스이며, 스레드는 프로세스 내에서 실행되는 최소 단위의 작업 흐름입니다. 그렇다면 코루틴은 무엇일까요? 코루틴은 스레드 내부에서 실행되는 사용자 수준의 경량 스레드(Micro-thread)로, 단일 스레드 환경에서 동시성(Concurrency)을 구현하는 기술입니다.
하드웨어와 OS는 프로세스와 스레드만 인식하며, 코루틴은 개발자가 효율적인 컨텍스트 스위칭을 위해 고안한 소프트웨어적 개념입니다. 즉, 하나의 스레드 안에서 여러 코드 블록의 실행을 중단하고 재개하며 전환하는 사용자 모드(User-mode) 컨텍스트 스위칭 기술입니다.
1.2 코루틴의 장단점
장점:
- 낮은 전환 비용: OS 수준의 스레드 전환보다 훨씬 적은 오버헤드로 작업을 전환할 수 있습니다.
- GIL 문제 우회: Python의 GIL(Global Interpreter Lock)로 인해 멀티스레드가 CPU 바운드 작업에서 병렬성을 발휘하지 못하지만, 코루틴은 단일 스레드 내에서 I/O 바운드 작업의 동시성을 극대화하여 CPU 유휴 시간을 줄입니다.
단점:
- 멀티코어 활용 불가: 본질적으로 단일 스레드에서 동작하므로 여러 CPU 코어를 동시에 사용하는 병렬(Parallel) 처리는 불가능합니다.
- 블로킹 취약성: 비동기 I/O를 사용하지 않는 동기 블로킹 코드(예:
time.sleep, 동기 DB 쿼리)가 실행되면 해당 스레드 전체가 멈추어 다른 모든 코루틴의 실행이 차단됩니다.
1.3 동기 I/O의 한계와 비동기 프로그래밍의 필요성
네트워크 요청, 파일 읽기/쓰기, 데이터베이스 쿼리 등 I/O 집약적 작업은 완료될 때까지 대기해야 하는 시간이 깁니다. 동기(Synchronous) 방식은 이 대기 시간 동안 프로그램 전체를 블로킹시켜 CPU 자원을 낭비합니다.
import time
def fetch_user_profile():
time.sleep(4) # 4초가 소요되는 동기 I/O 작업 시뮬레이션
return {"user_id": 101, "name": "Alice"}
def fetch_order_history():
time.sleep(2) # 2초가 소요되는 동기 I/O 작업 시뮬레이션
return {"order_id": 505, "status": "Shipped"}
def sync_workflow():
user = fetch_user_profile()
print(f"사용자 데이터 수신: {user}")
order = fetch_order_history()
print(f"주문 데이터 수신: {order}")
if __name__ == '__main__':
start_time = time.time()
sync_workflow()
print(f"총 소요 시간: {time.time() - start_time:.2f}초")
위 코드에서 fetch_user_profile이 4초 동안 블로킹되면, fetch_order_history는 그 이후에야 실행되어 총 6초가 소요됩니다. 비동기 프로그래밍의 핵심은 하나의 작업이 I/O를 기다리는 동안 CPU 제어권을 다른 작업으로 넘겨주어 전체 처리 시간을 단축하는 것입니다.
2. Asyncio와 코루틴의 핵심 메커니즘
2.1 async 키워드와 코루틴 객체
Python 3.5부터 도입된 async와 await 키워드는 비동기 코드를 동기 코드처럼 직관적으로 작성할 수 있게 해줍니다. async def로 정의된 함수는 코루틴 함수이며, 이를 호출하면 실행되는 것이 아니라 코루틴 객체(Coroutine Object)가 반환됩니다.
import asyncio
async def generate_report():
print("보고서 생성 시작")
return "Report_Data"
# 코루틴 함수 호출은 객체만 생성할 뿐 실행하지 않음
coroutine_obj = generate_report()
print(f"객체 타입: {type(coroutine_obj)}")
이 코루틴 객체는 이벤트 루프(Event Loop)에 등록되어야 비로소 스케줄링되고 실행됩니다.
2.2 이벤트 루프(Event Loop)의 역할
이벤트 루프는 비동기 프로그램의 심장(Heart)으로, 실행 가능한 코루틴을 추적하고 I/O 이벤트가 발생하면 대기 중인 코루틴을 깨워 실행하는 무한 루프입니다. Python 3.7 이후에는 asyncio.run()을 사용하여 이벤트 루프의 생성, 실행, 종료를 한 번에 처리하는 것이 권장됩니다.
import asyncio
async def display_message():
print("비동기 작업 실행 중...")
# 권장되는 이벤트 루프 실행 방식
asyncio.run(display_message())
2.3 await 키워드와 제어권 양보
await는 현재 코루틴의 실행을 일시 중단하고, 대상 객체(코루틴, Task, Future)의 작업이 완료될 때까지 제어권을 이벤트 루프에 반납하는 역할을 합니다. 단, await는 반드시 '대기 가능한 객체(Awaitable)'에만 사용할 수 있습니다.
import asyncio
import time
async def download_file(file_name, delay):
print(f"[{file_name}] 다운로드 시작")
await asyncio.sleep(delay) # 비동기 대기 (이벤트 루프에 제어권 양보)
print(f"[{file_name}] 다운로드 완료")
return f"{file_name}_data"
async def sequential_download():
# 순차적 실행 (직렬)
await download_file("image.png", 2)
await download_file("document.pdf", 3)
start = time.time()
asyncio.run(sequential_download())
print(f"직렬 실행 시간: {time.time() - start:.2f}초")
위 예제에서 await download_file(...)은 해당 코루틴이 완전히 끝날 때까지 기다리므로 직렬로 실행되어 총 5초가 소요됩니다. 진정한 동시성을 위해서는 작업을 Task로 래핑하여 이벤트 루프에 즉시 등록해야 합니다.
3. 다중 작업 스케줄링: Task와 Gather
3.1 Task를 활용한 동시 실행
asyncio.create_task()는 코루틴을 Task 객체로 감싸 이벤트 루프의 실행 큐에 즉시 삽입합니다. 이를 통해 여러 작업이 병렬적으로 진행될 수 있습니다.
import asyncio
import time
async def process_data(task_id, duration):
print(f"Task {task_id} 처리 시작")
await asyncio.sleep(duration)
print(f"Task {task_id} 처리 완료")
return task_id * 10
async def concurrent_execution():
# Task 생성 및 즉시 스케줄링
task_a = asyncio.create_task(process_data(1, 3))
task_b = asyncio.create_task(process_data(2, 1))
# 결과 대기
result_a = await task_a
result_b = await task_b
print(f"결과: {result_a}, {result_b}")
start = time.time()
asyncio.run(concurrent_execution())
print(f"병렬 실행 시간: {time.time() - start:.2f}초") # 약 3초 소요
3.2 asyncio.gather를 통한 일괄 처리
여러 개의 동시성 작업을 관리하고 결과를 순서대로 수집해야 할 때 asyncio.gather가 매우 유용합니다. 이 함수는 전달된 모든 Awaitable이 완료될 때까지 기다리며, 완료된 결과 리스트를 반환합니다.
import asyncio
async def fetch_api(endpoint, delay):
await asyncio.sleep(delay)
return {"endpoint": endpoint, "status": 200}
async def main():
# 여러 코루틴을 동시에 실행하고 결과를 리스트로 취합
responses = await asyncio.gather(
fetch_api("/users", 2),
fetch_api("/posts", 1),
fetch_api("/comments", 3)
)
for res in responses:
print(f"API 응답: {res}")
asyncio.run(main())
gather는 내부적으로 코루틴을 Task로 변환하므로 create_task를 명시적으로 호출하지 않아도 됩니다. 동적 개수의 작업을 처리할 때는 리스트 언패킹(*list)을 활용하면 됩니다.
4. Future 객체와 블로킹 작업의 통합
4.1 Future의 개념
Future는 아직 완료되지 않은 비동기 작업의 결과를 담는 그릇(Placeholder)입니다. 초기 상태는 Pending이며, 작업이 완료되면 결과나 예외가 설정되면서 Done 상태로 전환됩니다. Task는 Future의 하위 클래스로, 코루틴 실행을 관리하는 특수한 Future입니다.
4.2 동기 블로킹 코드와 이벤트 루프의 결합 (run_in_executor)
비동기 환경에서 어쩔 수 없이 동기 블로킹 라이브러리(예: requests, 동기 DB 드라이버, time.sleep)를 사용해야 한다면, 이를 스레드 풀이나 프로세스 풀에서 실행하여 이벤트 루프가 멈추는 것을 방지해야 합니다. 이때 run_in_executor와 Future가 결합됩니다.
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def heavy_cpu_bound_task(future_obj):
"""스레드 풀에서 실행될 동기 블로킹 작업"""
time.sleep(4) # 동기 블로킹
future_obj.set_result("무거운 작업 완료 결과")
async def async_wrapper():
print("비동기 래퍼 시작")
loop = asyncio.get_running_loop()
# Future 객체 생성
future = loop.create_future()
executor = ThreadPoolExecutor(max_workers=2)
# 스레드 풀에 작업 할당 (이벤트 루프 블로킹 방지)
loop.run_in_executor(executor, heavy_cpu_bound_task, future)
# Future가 완료될 때까지 비동기적으로 대기
result = await future
print(f"비동기 래퍼 종료, 결과: {result}")
return result
async def lightweight_task():
print("가벼운 비동기 작업 시작")
await asyncio.sleep(1)
print("가벼운 비동기 작업 종료")
return "Light"
async def orchestrator():
# 무거운 작업과 가벼운 작업을 동시에 실행
results = await asyncio.gather(
async_wrapper(),
lightweight_task()
)
print(f"최종 결과: {results}")
asyncio.run(orchestrator())
4.3 실행 흐름 분석
orchestrator가async_wrapper와lightweight_task를gather로 동시에 스케줄링합니다.async_wrapper는Future를 생성하고 블로킹 작업을 스레드 풀에 넘긴 후,await future에서 제어권을 이벤트 루프에 반납하고 대기(Pending) 상태가 됩니다.- 이벤트 루프는 즉시
lightweight_task를 실행합니다. 1초 후 이 작업이 완료됩니다. - 4초 후, 스레드 풀의 작업이 끝나고
future_obj.set_result()를 호출하여 Future의 상태를 Done으로 변경합니다. - 이벤트 루프는 Future의 상태 변화를 감지하고
async_wrapper를 깨워 남은 코드를 실행합니다.
이 패턴의 핵심은 Future가 단순한 결과 저장소가 아니라, 동기 스레드와 비동기 이벤트 루프를 연결하는 브릿지 역할을 한다는 점입니다. await가 없다면 이벤트 루프는 동기적으로 Future의 완료를 폴링(Polling)하게 되어 블로킹이 발생하므로, 반드시 await를 통해 협력적 양보 지점을 만들어야 합니다.