.thumb up, please!
문제
한때, Python의 멀티스레드 성능은 GIL로 인해 기대치에 미치지 못했습니다.
이에 3.4 버전부터 Python은 IO-bound 작업을 동시적으로 실행할 수 있는 asyncio 패키지를 도입했습니다. 여러 차례의 업데이트를 통해 asyncio API는 매우 우수한 성능을 보였고, 멀티스레드 버전보다 동시 작업의 성능이 크게 향상되었습니다.
그нако, 개발자들은 asyncio를 사용할 때 여전히 많은 오류를 mắc습니다:
하나의 오류 예제는await coroutine 방법을 통해 직접 호출하면, 동시 작업의 호출을 동기화로 전환시켜 최종적으로 동시성 특성을 상실하게 합니다.
async def main():
result_1 = await some_coro("name-1")
result_2 = await some_coro("name-2")
다른 오류 예제는아래와 같습니다. 개발자가 뒤로 실행되는 작업을 만들기 위해create_task를 사용하는 것은 인지했지만, 아래와 같이 작업을 차례로 기다리는 방법은 동시에 실행되는 작업을 순차적으로 기다리게 합니다.
async def main():
task_1 = asyncio.create_task(some_coro("name-1"))
task_2 = asyncio.create_task(some_coro("name-2"))
result_1 = await task_1
result_2 = await task_2
이 코드는 task_1이 완료될 때까지 task_2의 완료 여부와 상관없이 task_1을 기다립니다.
동시 작업 실행이란?
여기서 동시 작업이란 무엇일까요? 아래 그림을 통해 이해할 수 있습니다:
그림은 동시 프로세스가 두 부분으로 구성된다는 것을 보여줍니다: 뒤로 작업을 시작하고, 뒤로 작업을 주 함수로 다시 넣으며 결과를 획득하는 과정.
绝大多數讀者が 이미 create_task를 사용하여 뒤로 작업을 시작하는 방법을 알고 있습니다. 오늘은 뒤로 작업이 완료되는 방법들 중几种를 소개하고, 각 방법의 최선책을 공유할 것입니다.
시작
다양한 방법을 설명하기 전에, 동시 작업을 시뮬레이션하는 예제 async 방법과 예외를 친절하게 알리기 위해 사용할 custom AsyncException을 준비합니다:
from random import random, randint
import asyncio
class AsyncException(Exception):
def __init__(self, message, *args, **kwargs):
self.message = message
super(*args, **kwargs)
def __str__(self):
return self.message
async def exampleCoroutine(name):
print(f"코루틴 {name} 시작")
value = random()
지연 = randint(1, 4)
await asyncio.sleep(지연)
if value > 0.5:
raise AsyncException(f"어떤 나쁜 일이 {지연}초 후 발생했습니다.")
print(f"코루틴 {name} 완료. {지연}초 지연 있었습니다.")
return value
동시 작업 실행 방법 비교
1. asyncio.gather
asyncio.gather는 여러 뒤로 작업을 시작하고, 완료될 때까지 기다렸다가 결과 목록을 반환합니다:
async def main():
tasks, outputs = [], []
for i in range(3):
tasks.append(asyncio.create_task(exampleCoroutine(f'name-{i}')))
outputs = await asyncio.gather(*tasks) # 리스트 언패킹 필수
for output in outputs:
print(f">결과값 : {output}")
asyncio.run(main())
asyncio.gather는 리스트나 컬렉션을 직접 인수로 받을 수 없습니다. 만약 리스트를 전달하려면 반드시 언패킹이 필요합니다.
asyncio.gather는 또한return_exceptions 매개변수를 받습니다. 이 매개변수가 False일 때, 뒤로 작업 중 하나라도 예외를 일으키면 gather 호출자에게 전파됩니다. 이 때 gather의 결과 목록은 비어 있습니다.
async def main():
tasks, outputs = [], []
for i in range(3):
tasks.append(asyncio.create_task(exampleCoroutine(f'name-{i}')))
try:
outputs = await asyncio.gather(*tasks, return_exceptions=False) # 리스트 언패킹 필수
except AsyncException as e:
print(e)
for output in outputs:
print(f">결과값 : {output}")
asyncio.run(main())
return_exceptions 매개변수가 True일 때, 뒤로 작업에서 발생한 예외는 다른 작업의 실행을 방해하지 않고, 최종적으로 결과 목록에 함께 반환됩니다.
outputs = await asyncio.gather(*tasks, return_exceptions=True)
다음, 왜 gather가 리스트를 직접 받을 수 없고 반드시 언패킹이 필요한지 살펴보겠습니다. 만약 리스트가 채워지면 작업을 추가로 할 때, 리스트를 채운 후에 작업을 추가하기가 어렵습니다. 그러나 gather는 작업 그룹을 통해 중간에 새로운 작업을 추가할 수 있습니다:
async def main():
tasks, outputs = [], []
for i in range(3):
tasks.append(asyncio.create_task(exampleCoroutine(f'name-{i}')))
group_1 = asyncio.gather(*tasks) # note we don't use await now
# 특정 상황이 발생하면 새로운 작업을 추가할 수 있습니다
group_2 = asyncio.gather(group_1, asyncio.create_task(exampleCoroutine("새로운 작업")))
outputs = await group_2
for output in outputs:
print(f">결과값 : {output}")
asyncio.run(main())
그러나 gather는 직접 timeout 매개변수를 받지 않습니다. 만약 모든 작업에 대해 일정시간 제한을 걸고 싶다면 아래와 같이 asyncio.wait_for를 사용해야 합니다. 그러나 이 방법은 그리 우아하지 않습니다.
async def main():
tasks, outputs = [], []
for i in range(3):
tasks.append(asyncio.create_task(exampleCoroutine(f'name-{i}')))
outputs = await asyncio.wait_for(asyncio.gather(*tasks), timeout=2)
for output in outputs:
print(f">결과값 : {output}")
asyncio.run(main())
2. asyncio.as_completed
때로는 한 작업이 완료된 직후mediately 다른 작업을 수행해야 합니다. 예를 들어 데이터를 수집한 후 머신러닝 모델에 즉시 입력하는 경우, gather 방법만으로는 충분하지 않습니다. 이럴 때 사용할 수 있는 방법이asyncio.as_completed입니다.
asyncio.as_completed의 소스 코드를 살펴보겠습니다:
# 이는 *아ync* 메서드가 아닙니다! 이것은 반복자(yielding Futures)입니다.
def as_completed(fs, *, timeout=None):
# ...
for f in todo:
f.add_done_callback(_on_completion)
if todo and timeout is not None:
timeout_handle = loop.call_later(timeout, _on_timeout)
for _ in range(len(todo)):
yield _wait_for_one()
소스 코드가 보여주듯, as_completed는 아ync 메서드가 아니라 반복자로, 작업 완료를 차례로 확인합니다. 따라서 아래와 같이 각 작업이 완료될 때마다 즉시 처리할 수 있습니다:
async def main():
tasks = []
for i in range(5):
tasks.append(asyncio.create_task(exampleCoroutine(f"name-{i}")))
for done in asyncio.as_completed(tasks): # 리스트 언패킹 필요 없음
try:
result = await done
print(f">결과값 : {result}")
except AsyncException as e:
print(e)
asyncio.run(main())
as_completed는 timeout 매개변수를 받으며, 제한시간이 초과할 경우 asyncio.TimeoutError가 일어납니다:
async def main():
tasks = []
for i in range(5):
tasks.append(asyncio.create_task(exampleCoroutine(f"name-{i}")))
for done in asyncio.as_completed(tasks, timeout=2): # 리스트 언패킹 필요 없음
try:
result = await done
print(f">결과값 : {result}")
except AsyncException as e:
print(e)
except asyncio.TimeoutError: # TimeoutError를 처리해야 합니다
print("시간초과.")
asyncio.run(main())
as_complete는 작업 실행 결과 처리 측면에서 gather보다 유연합니다. 그러나 기존 작업 목록에 새로운 작업을 추가하는 데는 어려움이 있습니다.
3. asyncio.wait
asyncio.wait의 호출 방식은 as_completed와 유사하지만, 완료된 작업과 미완료 작업을 담는 두 개의 컬렉션을 반환합니다. 완료된 작업은done에, 아직 실행 중인 작업은pending에 저장됩니다.
asyncio.wait는 세 가지 중 한가지를 선택할 수 있는return_when 매개변수를 받습니다:
- return_when이asyncio.ALL_COMPLETED일 때, done에 모든 완료된 작업이 저장되고, pending는 비어 있습니다.
- return_when이asyncio.FIRST_COMPLETED일 때, done에 완료된 작업이 저장되고, pending에는 실행 중인 작업이 저장됩니다.
async def main():
tasks = set()
for i in range(5):
tasks.add(asyncio.create_task(exampleCoroutine(f"name-{i}")))
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
try:
result = await task
print(f">결과값 : {result}")
except AsyncException as e:
print(e)
print(f"pending 작업의 길이: {len(pending)}")
asyncio.run(main())
- return_when이asyncio.FIRST_EXCEPTION일 때, done에 예외를 일으킨 후 완료된 작업이 저장되고, pending에는 아직 실행 중인 작업이 저장됩니다.
return_when이asyncio.FIRST_COMPLETED 또는asyncio.FIRST_EXEPTION일 때, asyncio.wait를 재귀적으로 호출하여 새로운 작업을 추가하고, 상황에 따라 모든 작업을 완료할 수 있습니다:
async def main():
pending = set()
for i in range(5):
pending.add(asyncio.create_task(exampleCoroutine(f"name-{i}"))) # 작업 목록의 유형과 이름을 변경
while pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_EXCEPTION)
for task in done:
try:
result = await task
print(f">결과값 : {result}")
except AsyncException as e:
print(e)
pending.add(asyncio.create_task(exampleCoroutine("새로운 작업")))
print(f"pending 작업의 길이: {len(pending)}")
asyncio.run(main())
4. asyncio.TaskGroup
Python 3.11부터 asyncio는 새로운 TaskGroup API를 도입하여, Python이 구조화된 동시성을 지원하게 되었습니다. 이 기능은 동시 작업의 수명 주기를 더 파이썬 다운 방법으로 관리할 수 있도록 합니다.
정리
이 기사는 asyncio.gather, asyncio.as_completed, asyncio.wait API와 Python 3.11에서 도입된 asyncio.TaskGroup 기능을绍介했습니다.
실제 요구에 맞는 작업 관리 방식을 사용하면 우리의 asyncio 동시 프로그래밍이 더 유연해질 수 있습니다.
이 기사는 여러개의 플랫폼에서 발행되었습니다.