1. Celery 개요
Celery는 파이썬 기반의 분산 비동기 작업 처리 프레임워크입니다. 이름은 '샐러리(salary)'가 아닌 채소인 '셀러리(celery)'에서 유래했으며, 주로 장시간 소요되거나 즉시 실행이 필요 없는 작업들을 백그라운드에서 처리하기 위해 사용됩니다.
주요 용도:
- 비동기 작업 처리: HTTP 요청과 같은 실시간 처리 외부 작업을 별도 프로세스에서 수행하여 응답 속도 향상
- 지연 작업 실행: 특정 시간 후에 실행되어야 하는 작업 예약 (예: 10분 후 알림 전송)
- 정기 작업 스케줄링: 주기적으로 반복 실행되는 작업 관리 (예: 매일 오전 9시 리포트 생성)
구조 구성 요소:
- Broker (메시지 브로커): 작업 큐 역할을 하며, Redis 또는 RabbitMQ와 같은 외부 서비스를 사용합니다.
- Worker: 브로커에서 작업을 가져와 실제로 실행하는 프로세스입니다.
- Result Backend: 작업 실행 결과를 저장하는 저장소로, Redis, 데이터베이스 등을 사용할 수 있습니다.
2. 기본 사용법
먼저 설치합니다:
pip install celery
다음은 간단한 예제 코드입니다:
# tasks.py
from celery import Celery
import time
# Redis를 브로커와 백엔드로 설정
app = Celery(
'demo',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
@app.task
def multiply(x, y):
time.sleep(3) # 시뮬레이션된 지연
result = x * y
print(f"결과: {result}")
return result
작업 실행 방법:
# 작업 제출
from tasks import multiply
result = multiply.delay(4, 5)
print("작업 ID:", result.id)
Worker 시작 (Linux/Mac):
celery -A tasks worker -l info
Windows 사용자는 eventlet 추가 필요:
celery -A tasks worker -l info -P eventlet
결과 확인:
from celery.result import AsyncResult
from tasks import app
res = AsyncResult(result.id, app=app)
if res.ready():
if res.successful():
print("성공:", res.get())
else:
print("실패:", res.info)
else:
print("대기 중")
3. 모듈화된 패키지 구조
재사용 가능한 구조로 구성해 보겠습니다:
project/
├── celery_app/
│ ├── __init__.py
│ ├── config.py
│ ├── core.py
│ ├── auth_tasks.py
│ └── data_tasks.py
├── manage.py
└── scheduler.py
core.py - 앱 초기화:
from celery import Celery
app = Celery('project')
app.config_from_object('celery_app.config')
# 태스크 자동 로드
app.autodiscover_tasks(['celery_app.auth_tasks', 'celery_app.data_tasks'])
config.py - 설정 정의:
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'
# 태스크 직렬화 형식
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Asia/Seoul'
enable_utc = False
Worker 실행:
celery -A celery_app.core worker -l info
4. 고급 작업 유형
다양한 작업 실행 방식:
# 즉시 비동기 실행
multiply.delay(3, 7)
# 지정 시간 이후 실행 (ETA)
from datetime import datetime, timedelta
eta_time = datetime.utcnow() + timedelta(minutes=2)
multiply.apply_async(args=[3, 7], eta=eta_time)
# 주기적 작업 설정 (Beat Scheduler)
from celery.schedules import crontab
app.conf.beat_schedule = {
'daily-cleanup': {
'task': 'celery_app.data_tasks.cleanup_old_records',
'schedule': crontab(hour=2, minute=0), # 매일 새벽 2시
},
'hourly-stats': {
'task': 'celery_app.data_tasks.generate_hourly_report',
'schedule': crontab(minute=30), # 매시 30분
},
}
Beat 스케줄러 실행:
celery -A celery_app.core beat -l info
5. Django 통합
Django 프로젝트에서 사용 시 설정:
# celery_app/core.py
import os
from django.apps import apps
from celery import Celery
# Django 환경 설정
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
# 앱 준비
apps.populate()
tasks.py 예제:
@app.task
def send_notification(user_id, message):
from myapp.models import User
user = User.objects.get(id=user_id)
# 이메일 전송 로직
return f"Notified {user.email}"
뷰에서 호출:
def api_view(request):
task = send_notification.delay(123, "Hello!")
return JsonResponse({'task_id': task.id})
6. 실시간 대응 시나리오: 한정판 판매 시스템
프론트엔드 로직:
function startSale() {
axios.post('/api/sale/start/')
.then(response => {
const taskId = response.data.task_id;
const interval = setInterval(() => {
axios.get(`/api/sale/status/${taskId}/`)
.then(res => {
if (res.data.status === 'SUCCESS') {
alert('구매 성공!');
clearInterval(interval);
} else if (res.data.status === 'FAILURE') {
alert('매진되었습니다.');
clearInterval(interval);
}
});
}, 3000);
});
}
백엔드 구현:
@app.task
def process_sale_task(user_id):
try:
with transaction.atomic():
product = Product.objects.select_for_update().get(id=1)
if product.stock > 0:
product.stock -= 1
product.save()
Order.objects.create(user_id=user_id, product=product)
return {'success': True, 'msg': '구매 완료'}
else:
return {'success': False, 'msg': '품절'}
except Exception as e:
return {'success': False, 'msg': str(e)}
# API 뷰
class StartSaleView(APIView):
def post(self, request):
task = process_sale_task.delay(request.user.id)
return Response({'task_id': task.id})
class SaleStatusView(APIView):
def get(self, request, task_id):
result = AsyncResult(task_id, app=app)
if result.ready():
data = result.get()
status = 'SUCCESS' if data['success'] else 'FAILURE'
else:
status = 'PENDING'
return Response({'status': status})
7. 캐시 일관성 유지 전략
7.1 캐싱 도입
API 응답 속도 향상을 위한 캐싱:
class BannerListView(APIView):
def get(self, request):
cache_key = 'homepage_banners'
cached_data = cache.get(cache_key)
if cached_data:
return Response(cached_data)
banners = Banner.objects.filter(active=True).order_by('priority')
serializer = BannerSerializer(banners, many=True)
serialized_data = serializer.data
# CDN 호스트 추가
base_url = settings.CDN_URL
for item in serialized_data:
if item['image']:
item['image'] = f"{base_url}{item['image']}"
cache.set(cache_key, serialized_data, timeout=60*60)
return Response(serialized_data)
7.2 일정 기반 캐시 갱신
데이터베이스 변경 시 즉각적인 캐시 무효화가 어려운 경우, 주기적 갱신으로 일관성 확보:
@app.task
def refresh_homepage_cache():
cache_key = 'homepage_banners'
try:
banners = Banner.objects.filter(active=True).order_by('priority')
serializer = BannerSerializer(banners, many=True)
data = serializer.data
base_url = settings.CDN_URL
for item in data:
if item['image']:
item['image'] = f"{base_url}{item['image']}"
cache.set(cache_key, data, timeout=60*60)
return f"Cache updated at {datetime.now()}"
except Exception as e:
return f"Update failed: {str(e)}"
# 스케줄 등록
app.conf.beat_schedule.update({
'refresh-banners': {
'task': 'celery_app.data_tasks.refresh_homepage_cache',
'schedule': timedelta(minutes=30),
}
})
이 방식은 짧은 시간 동안 불일치 상태가 존재할 수 있지만, 대부분의 웹 서비스에서 수용 가능한 trade-off입니다.