Celery를 활용한 분산 비동기 작업 처리

1. Celery 개요

Celery는 파이썬 기반의 분산 비동기 작업 처리 프레임워크입니다. 이름은 '샐러리(salary)'가 아닌 채소인 '셀러리(celery)'에서 유래했으며, 주로 장시간 소요되거나 즉시 실행이 필요 없는 작업들을 백그라운드에서 처리하기 위해 사용됩니다.

주요 용도:

  • 비동기 작업 처리: HTTP 요청과 같은 실시간 처리 외부 작업을 별도 프로세스에서 수행하여 응답 속도 향상
  • 지연 작업 실행: 특정 시간 후에 실행되어야 하는 작업 예약 (예: 10분 후 알림 전송)
  • 정기 작업 스케줄링: 주기적으로 반복 실행되는 작업 관리 (예: 매일 오전 9시 리포트 생성)

구조 구성 요소:

  • Broker (메시지 브로커): 작업 큐 역할을 하며, Redis 또는 RabbitMQ와 같은 외부 서비스를 사용합니다.
  • Worker: 브로커에서 작업을 가져와 실제로 실행하는 프로세스입니다.
  • Result Backend: 작업 실행 결과를 저장하는 저장소로, Redis, 데이터베이스 등을 사용할 수 있습니다.

2. 기본 사용법

먼저 설치합니다:

pip install celery

다음은 간단한 예제 코드입니다:

# tasks.py
from celery import Celery
import time

# Redis를 브로커와 백엔드로 설정
app = Celery(
    'demo',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1'
)

@app.task
def multiply(x, y):
    time.sleep(3)  # 시뮬레이션된 지연
    result = x * y
    print(f"결과: {result}")
    return result

작업 실행 방법:

# 작업 제출
from tasks import multiply
result = multiply.delay(4, 5)
print("작업 ID:", result.id)

Worker 시작 (Linux/Mac):

celery -A tasks worker -l info

Windows 사용자는 eventlet 추가 필요:

celery -A tasks worker -l info -P eventlet

결과 확인:

from celery.result import AsyncResult
from tasks import app

res = AsyncResult(result.id, app=app)
if res.ready():
    if res.successful():
        print("성공:", res.get())
    else:
        print("실패:", res.info)
else:
    print("대기 중")

3. 모듈화된 패키지 구조

재사용 가능한 구조로 구성해 보겠습니다:

project/
├── celery_app/
│   ├── __init__.py
│   ├── config.py
│   ├── core.py
│   ├── auth_tasks.py
│   └── data_tasks.py
├── manage.py
└── scheduler.py

core.py - 앱 초기화:

from celery import Celery

app = Celery('project')
app.config_from_object('celery_app.config')

# 태스크 자동 로드
app.autodiscover_tasks(['celery_app.auth_tasks', 'celery_app.data_tasks'])

config.py - 설정 정의:

broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'

# 태스크 직렬화 형식
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Asia/Seoul'
enable_utc = False

Worker 실행:

celery -A celery_app.core worker -l info

4. 고급 작업 유형

다양한 작업 실행 방식:

# 즉시 비동기 실행
multiply.delay(3, 7)

# 지정 시간 이후 실행 (ETA)
from datetime import datetime, timedelta
eta_time = datetime.utcnow() + timedelta(minutes=2)
multiply.apply_async(args=[3, 7], eta=eta_time)

# 주기적 작업 설정 (Beat Scheduler)
from celery.schedules import crontab

app.conf.beat_schedule = {
    'daily-cleanup': {
        'task': 'celery_app.data_tasks.cleanup_old_records',
        'schedule': crontab(hour=2, minute=0),  # 매일 새벽 2시
    },
    'hourly-stats': {
        'task': 'celery_app.data_tasks.generate_hourly_report',
        'schedule': crontab(minute=30),  # 매시 30분
    },
}

Beat 스케줄러 실행:

celery -A celery_app.core beat -l info

5. Django 통합

Django 프로젝트에서 사용 시 설정:

# celery_app/core.py
import os
from django.apps import apps
from celery import Celery

# Django 환경 설정
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

# 앱 준비
apps.populate()

tasks.py 예제:

@app.task
def send_notification(user_id, message):
    from myapp.models import User
    user = User.objects.get(id=user_id)
    # 이메일 전송 로직
    return f"Notified {user.email}"

뷰에서 호출:

def api_view(request):
    task = send_notification.delay(123, "Hello!")
    return JsonResponse({'task_id': task.id})

6. 실시간 대응 시나리오: 한정판 판매 시스템

프론트엔드 로직:

function startSale() {
  axios.post('/api/sale/start/')
    .then(response => {
      const taskId = response.data.task_id;
      const interval = setInterval(() => {
        axios.get(`/api/sale/status/${taskId}/`)
          .then(res => {
            if (res.data.status === 'SUCCESS') {
              alert('구매 성공!');
              clearInterval(interval);
            } else if (res.data.status === 'FAILURE') {
              alert('매진되었습니다.');
              clearInterval(interval);
            }
          });
      }, 3000);
    });
}

백엔드 구현:

@app.task
def process_sale_task(user_id):
    try:
        with transaction.atomic():
            product = Product.objects.select_for_update().get(id=1)
            if product.stock > 0:
                product.stock -= 1
                product.save()
                Order.objects.create(user_id=user_id, product=product)
                return {'success': True, 'msg': '구매 완료'}
            else:
                return {'success': False, 'msg': '품절'}
    except Exception as e:
        return {'success': False, 'msg': str(e)}

# API 뷰
class StartSaleView(APIView):
    def post(self, request):
        task = process_sale_task.delay(request.user.id)
        return Response({'task_id': task.id})

class SaleStatusView(APIView):
    def get(self, request, task_id):
        result = AsyncResult(task_id, app=app)
        if result.ready():
            data = result.get()
            status = 'SUCCESS' if data['success'] else 'FAILURE'
        else:
            status = 'PENDING'
        return Response({'status': status})

7. 캐시 일관성 유지 전략

7.1 캐싱 도입

API 응답 속도 향상을 위한 캐싱:

class BannerListView(APIView):
    def get(self, request):
        cache_key = 'homepage_banners'
        cached_data = cache.get(cache_key)
        
        if cached_data:
            return Response(cached_data)
            
        banners = Banner.objects.filter(active=True).order_by('priority')
        serializer = BannerSerializer(banners, many=True)
        serialized_data = serializer.data
        
        # CDN 호스트 추가
        base_url = settings.CDN_URL
        for item in serialized_data:
            if item['image']:
                item['image'] = f"{base_url}{item['image']}"
                
        cache.set(cache_key, serialized_data, timeout=60*60)
        return Response(serialized_data)

7.2 일정 기반 캐시 갱신

데이터베이스 변경 시 즉각적인 캐시 무효화가 어려운 경우, 주기적 갱신으로 일관성 확보:

@app.task
def refresh_homepage_cache():
    cache_key = 'homepage_banners'
    try:
        banners = Banner.objects.filter(active=True).order_by('priority')
        serializer = BannerSerializer(banners, many=True)
        data = serializer.data
        
        base_url = settings.CDN_URL
        for item in data:
            if item['image']:
                item['image'] = f"{base_url}{item['image']}"
                
        cache.set(cache_key, data, timeout=60*60)
        return f"Cache updated at {datetime.now()}"
    except Exception as e:
        return f"Update failed: {str(e)}"

# 스케줄 등록
app.conf.beat_schedule.update({
    'refresh-banners': {
        'task': 'celery_app.data_tasks.refresh_homepage_cache',
        'schedule': timedelta(minutes=30),
    }
})

이 방식은 짧은 시간 동안 불일치 상태가 존재할 수 있지만, 대부분의 웹 서비스에서 수용 가능한 trade-off입니다.

태그: celery Redis Django asynchronous-tasks task-queue

5월 27일 08:34에 게시됨