이 방법을 사용하여 Python의 동시 작업 실행을 개선하세요

.thumb up, please!

문제

한때, Python의 멀티스레드 성능은 GIL로 인해 기대치에 미치지 못했습니다.

이에 3.4 버전부터 Python은 IO-bound 작업을 동시적으로 실행할 수 있는 asyncio 패키지를 도입했습니다. 여러 차례의 업데이트를 통해 asyncio API는 매우 우수한 성능을 보였고, 멀티스레드 버전보다 동시 작업의 성능이 크게 향상되었습니다.

그нако, 개발자들은 asyncio를 사용할 때 여전히 많은 오류를 mắc습니다:

하나의 오류 예제는await coroutine 방법을 통해 직접 호출하면, 동시 작업의 호출을 동기화로 전환시켜 최종적으로 동시성 특성을 상실하게 합니다.

async def main():
    result_1 = await some_coro("name-1")
    result_2 = await some_coro("name-2")

다른 오류 예제는아래와 같습니다. 개발자가 뒤로 실행되는 작업을 만들기 위해create_task를 사용하는 것은 인지했지만, 아래와 같이 작업을 차례로 기다리는 방법은 동시에 실행되는 작업을 순차적으로 기다리게 합니다.

async def main():
    task_1 = asyncio.create_task(some_coro("name-1"))
    task_2 = asyncio.create_task(some_coro("name-2"))
    
    result_1 = await task_1
    result_2 = await task_2

이 코드는 task_1이 완료될 때까지 task_2의 완료 여부와 상관없이 task_1을 기다립니다.

동시 작업 실행이란?

여기서 동시 작업이란 무엇일까요? 아래 그림을 통해 이해할 수 있습니다:

그림은 동시 프로세스가 두 부분으로 구성된다는 것을 보여줍니다: 뒤로 작업을 시작하고, 뒤로 작업을 주 함수로 다시 넣으며 결과를 획득하는 과정.

绝大多數讀者が 이미 create_task를 사용하여 뒤로 작업을 시작하는 방법을 알고 있습니다. 오늘은 뒤로 작업이 완료되는 방법들 중几种를 소개하고, 각 방법의 최선책을 공유할 것입니다.

시작

다양한 방법을 설명하기 전에, 동시 작업을 시뮬레이션하는 예제 async 방법과 예외를 친절하게 알리기 위해 사용할 custom AsyncException을 준비합니다:

from random import random, randint
import asyncio


class AsyncException(Exception):
    def __init__(self, message, *args, **kwargs):
        self.message = message
        super(*args, **kwargs)

    def __str__(self):
        return self.message


async def exampleCoroutine(name):
    print(f"코루틴 {name} 시작")
    value = random()

    지연 = randint(1, 4)
    await asyncio.sleep(지연)
    if value > 0.5:
        raise AsyncException(f"어떤 나쁜 일이 {지연}초 후 발생했습니다.")
    print(f"코루틴 {name} 완료. {지연}초 지연 있었습니다.")
    return value

동시 작업 실행 방법 비교

1. asyncio.gather

asyncio.gather는 여러 뒤로 작업을 시작하고, 완료될 때까지 기다렸다가 결과 목록을 반환합니다:

async def main():
    tasks, outputs = [], []
    for i in range(3):
        tasks.append(asyncio.create_task(exampleCoroutine(f'name-{i}')))

    outputs = await asyncio.gather(*tasks)  # 리스트 언패킹 필수
    for output in outputs:
        print(f">결과값 : {output}")

asyncio.run(main())

asyncio.gather는 리스트나 컬렉션을 직접 인수로 받을 수 없습니다. 만약 리스트를 전달하려면 반드시 언패킹이 필요합니다.

asyncio.gather는 또한return_exceptions 매개변수를 받습니다. 이 매개변수가 False일 때, 뒤로 작업 중 하나라도 예외를 일으키면 gather 호출자에게 전파됩니다. 이 때 gather의 결과 목록은 비어 있습니다.

async def main():
    tasks, outputs = [], []
    for i in range(3):
        tasks.append(asyncio.create_task(exampleCoroutine(f'name-{i}')))

    try:
        outputs = await asyncio.gather(*tasks, return_exceptions=False)  # 리스트 언패킹 필수
    except AsyncException as e:
        print(e)
    for output in outputs:
        print(f">결과값 : {output}")

asyncio.run(main())

return_exceptions 매개변수가 True일 때, 뒤로 작업에서 발생한 예외는 다른 작업의 실행을 방해하지 않고, 최종적으로 결과 목록에 함께 반환됩니다.

outputs = await asyncio.gather(*tasks, return_exceptions=True)

다음, 왜 gather가 리스트를 직접 받을 수 없고 반드시 언패킹이 필요한지 살펴보겠습니다. 만약 리스트가 채워지면 작업을 추가로 할 때, 리스트를 채운 후에 작업을 추가하기가 어렵습니다. 그러나 gather는 작업 그룹을 통해 중간에 새로운 작업을 추가할 수 있습니다:

async def main():
    tasks, outputs = [], []
    for i in range(3):
        tasks.append(asyncio.create_task(exampleCoroutine(f'name-{i}')))
    group_1 = asyncio.gather(*tasks)  # note we don't use await now
    # 특정 상황이 발생하면 새로운 작업을 추가할 수 있습니다
    group_2 = asyncio.gather(group_1, asyncio.create_task(exampleCoroutine("새로운 작업")))
    outputs = await group_2
    for output in outputs:
        print(f">결과값 : {output}")

asyncio.run(main())

그러나 gather는 직접 timeout 매개변수를 받지 않습니다. 만약 모든 작업에 대해 일정시간 제한을 걸고 싶다면 아래와 같이 asyncio.wait_for를 사용해야 합니다. 그러나 이 방법은 그리 우아하지 않습니다.

async def main():
    tasks, outputs = [], []
    for i in range(3):
        tasks.append(asyncio.create_task(exampleCoroutine(f'name-{i}')))

    outputs = await asyncio.wait_for(asyncio.gather(*tasks), timeout=2)
    for output in outputs:
        print(f">결과값 : {output}")

asyncio.run(main())

2. asyncio.as_completed

때로는 한 작업이 완료된 직후mediately 다른 작업을 수행해야 합니다. 예를 들어 데이터를 수집한 후 머신러닝 모델에 즉시 입력하는 경우, gather 방법만으로는 충분하지 않습니다. 이럴 때 사용할 수 있는 방법이asyncio.as_completed입니다.

asyncio.as_completed의 소스 코드를 살펴보겠습니다:

# 이는 *아ync* 메서드가 아닙니다! 이것은 반복자(yielding Futures)입니다.
def as_completed(fs, *, timeout=None):
  # ...
  for f in todo:
      f.add_done_callback(_on_completion)
  if todo and timeout is not None:
      timeout_handle = loop.call_later(timeout, _on_timeout)
  for _ in range(len(todo)):
      yield _wait_for_one()

소스 코드가 보여주듯, as_completed는 아ync 메서드가 아니라 반복자로, 작업 완료를 차례로 확인합니다. 따라서 아래와 같이 각 작업이 완료될 때마다 즉시 처리할 수 있습니다:

async def main():
    tasks = []
    for i in range(5):
        tasks.append(asyncio.create_task(exampleCoroutine(f"name-{i}")))

    for done in asyncio.as_completed(tasks):  # 리스트 언패킹 필요 없음
        try:
            result = await done
            print(f">결과값 : {result}")
        except AsyncException as e:
            print(e)

asyncio.run(main())

as_completed는 timeout 매개변수를 받으며, 제한시간이 초과할 경우 asyncio.TimeoutError가 일어납니다:

async def main():
    tasks = []
    for i in range(5):
        tasks.append(asyncio.create_task(exampleCoroutine(f"name-{i}")))

    for done in asyncio.as_completed(tasks, timeout=2):  # 리스트 언패킹 필요 없음
        try:
            result = await done
            print(f">결과값 : {result}")
        except AsyncException as e:
            print(e)
        except asyncio.TimeoutError: # TimeoutError를 처리해야 합니다
            print("시간초과.")

asyncio.run(main())

as_complete는 작업 실행 결과 처리 측면에서 gather보다 유연합니다. 그러나 기존 작업 목록에 새로운 작업을 추가하는 데는 어려움이 있습니다.

3. asyncio.wait

asyncio.wait의 호출 방식은 as_completed와 유사하지만, 완료된 작업과 미완료 작업을 담는 두 개의 컬렉션을 반환합니다. 완료된 작업은done에, 아직 실행 중인 작업은pending에 저장됩니다.

asyncio.wait는 세 가지 중 한가지를 선택할 수 있는return_when 매개변수를 받습니다:

  • return_when이asyncio.ALL_COMPLETED일 때, done에 모든 완료된 작업이 저장되고, pending는 비어 있습니다.
  • return_when이asyncio.FIRST_COMPLETED일 때, done에 완료된 작업이 저장되고, pending에는 실행 중인 작업이 저장됩니다.
async def main():
    tasks = set()
    for i in range(5):
        tasks.add(asyncio.create_task(exampleCoroutine(f"name-{i}")))

    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    for task in done:
        try:
            result = await task
            print(f">결과값 : {result}")
        except AsyncException as e:
            print(e)
    print(f"pending 작업의 길이: {len(pending)}")

asyncio.run(main())

  • return_when이asyncio.FIRST_EXCEPTION일 때, done에 예외를 일으킨 후 완료된 작업이 저장되고, pending에는 아직 실행 중인 작업이 저장됩니다.

return_when이asyncio.FIRST_COMPLETED 또는asyncio.FIRST_EXEPTION일 때, asyncio.wait를 재귀적으로 호출하여 새로운 작업을 추가하고, 상황에 따라 모든 작업을 완료할 수 있습니다:

async def main():
    pending = set()
    for i in range(5):
        pending.add(asyncio.create_task(exampleCoroutine(f"name-{i}")))  # 작업 목록의 유형과 이름을 변경

    while pending:
        done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_EXCEPTION)
        for task in done:
            try:
                result = await task
                print(f">결과값 : {result}")
            except AsyncException as e:
                print(e)
                pending.add(asyncio.create_task(exampleCoroutine("새로운 작업")))
    print(f"pending 작업의 길이: {len(pending)}")

asyncio.run(main())

4. asyncio.TaskGroup

Python 3.11부터 asyncio는 새로운 TaskGroup API를 도입하여, Python이 구조화된 동시성을 지원하게 되었습니다. 이 기능은 동시 작업의 수명 주기를 더 파이썬 다운 방법으로 관리할 수 있도록 합니다.

정리

이 기사는 asyncio.gather, asyncio.as_completed, asyncio.wait API와 Python 3.11에서 도입된 asyncio.TaskGroup 기능을绍介했습니다.

실제 요구에 맞는 작업 관리 방식을 사용하면 우리의 asyncio 동시 프로그래밍이 더 유연해질 수 있습니다.

이 기사는 여러개의 플랫폼에서 발행되었습니다.

태그: asynchronous programming asyncio concurrency python

5월 22일 23:07에 게시됨