처리 시간이 오래 걸리는 작업은 일반적으로 비동기 방식으로 처리됩니다. 이러한 비동기 처리 방식은 Future 패턴이라고도 불립니다.
일반적인 처리 흐름 비동기 API를 요청하면, 시스템은 즉시 응답을 반환하며 작업이 시작되었음을 알려줍니다. 이 응답에는 일반적으로 결과를 추적하기 위한 작업 ID와 함께 결과 조회를 위한 엔드포인트가 포함됩니다. 결과가 아직 처리되지 않은 경우 조회 API는 "미완료" 상태를 반환하고, 처리가 완료되면 해당 데이터를 반환합니다.
처리 방법 비동기 API는 일반적으로 폴링(polling) 방식을 사용합니다. 일정한 간격으로 결과 조회 API를 요청하여 상태가 완료되거나 지정된 데이터를 얻을 때까지 또는 타임아웃될 때까지 반복합니다.
비동기 API가 추적 ID와 조회 엔드포인트를 제공하지 않는 경우, 동일한 방식으로 데이터베이스나 로그 데이터를 주기적으로 조회하여 지정된 결과를 얻거나 타임아웃될 때까지 반복할 수 있습니다.
예시 API
- 작업 생성 API
요청 주소
http://api.example.com/v1/tasks/create/
요청 방식 POST
요청 형식 JSON
| 파라미터 | 타입 | 설명 |
|---|---|---|
| user_id | String | 사용자 식별자 |
| task_type | String | 작업 유형 |
| payload | Object | 작업 데이터 |
| priority | int | 우선순위 (1-5) |
응답 예시 파라미터 누락:
{
"status": "error",
"message": "필수 파라미터가 누락되었습니다"
}
성공:
{
"status": "accepted",
"task_id": "task_12345"
}
- 작업 결과 조회 API
요청 주소
http://api.example.com/v1/tasks/status?task_id=***
요청 방식 GET
| 파라미터 | 타입 | 설명 |
|---|---|---|
| task_id | String | 작업 ID |
응답 예시 처리 중:
{
"status": "processing"
}
완료:
{
"status": "completed",
"result": {
"output_data": "작업 결과 데이터",
"processing_time": "5.2s"
}
}
파이썬 구현 방법
import time
import requests
import json
class AsyncTaskTester:
def __init__(self, base_url="http://api.example.com/v1/"):
self.base_url = base_url
self.max_retries = 60
self.polling_interval = 2
def initiate_task(self, user_id, task_type, payload, priority=3):
"""새로운 비동기 작업을 시작합니다"""
endpoint = f"{self.base_url}tasks/create/"
headers = {"Content-Type": "application/json"}
data = {
"user_id": user_id,
"task_type": task_type,
"payload": payload,
"priority": priority
}
try:
response = requests.post(endpoint, headers=headers, data=json.dumps(data))
response.raise_for_status()
return response.json().get("task_id")
except requests.exceptions.RequestException as e:
print(f"작업 생성 실패: {e}")
return None
def fetch_task_result(self, task_id):
"""작업 결과를 폴링 방식으로 조회합니다"""
endpoint = f"{self.base_url}tasks/status"
params = {"task_id": task_id}
start_time = time.time()
elapsed = 0
while elapsed < self.max_retries * self.polling_interval:
try:
response = requests.get(endpoint, params=params)
response.raise_for_status()
result = response.json()
if result.get("status") == "completed":
return result.get("result")
elif result.get("status") == "processing":
print(f"작업 처리 중... ({elapsed//self.polling_interval + 1}회 시도)")
else:
print(f"예상치 못한 상태: {result.get('status')}")
except requests.exceptions.RequestException as e:
print(f"요청 실패: {e}")
time.sleep(self.polling_interval)
elapsed = time.time() - start_time
print(f"최대 시도 횟수({self.max_retries}회)를 초과했습니다")
return None
def main():
tester = AsyncTaskTester()
# 테스트할 작업 데이터
test_payload = {
"input_data": "테스트 입력",
"parameters": {
"option1": "value1",
"option2": "value2"
}
}
# 작업 시작
task_id = tester.initiate_task(
user_id="test_user_001",
task_type="data_processing",
payload=test_payload,
priority=1
)
if task_id:
print(f"작업이 시작되었습니다. ID: {task_id}")
# 결과 조회
result = tester.fetch_task_result(task_id)
if result:
print("작업 결과:")
print(json.dumps(result, indent=2, ensure_ascii=False))
else:
print("작업 결과를 가져오지 못했습니다")
else:
print("작업을 시작할 수 없습니다")
if __name__ == "__main__":
main()