먼저 Redis 서버를 설치합니다. 윈도우 환경에서는 5.0 이상 버전이 필요하며, 공식 릴리스 페이지에서 다운로드할 수 있습니다: Releases · tporadowski/redis (github.com)
다음으로 Python 환경에 필요한 패키지를 설치합니다. 다음과 같은 패키지들이 필수입니다:
channelsdaphneasgi_redisredis
설치 후, 장고 설정 파일(settings.py)에 아래와 같이 구성합니다.
WSGI_APPLICATION = "start_up_file_km.wsgi.application"
ASGI_APPLICATION = "start_up_file_km.asgi.application"
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
'hosts': [('127.0.0.1', 6379)],
},
},
}
ASGI 애플리케이션 설정 파일(asgi.py)을 작성합니다. 이 파일은 장고 앱의 비동기 처리 기반을 정의합니다.
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "LimsApp.settings.settings")
django_asgi_app = get_asgi_application()
application = ProtocolTypeRouter({
"http": django_asgi_app,
"websocket": AuthMiddlewareStack(
URLRouter(
socket_urlpatterns
)
),
})
웹소켓 라우팅을 위한 경로 설정 파일을 만듭니다. 예시로 utils/consumers.py에서 소비자 클래스를 정의하고, 라우팅 목록을 구성합니다.
from utils import consumers
from django.urls import path
# 웹소켓 경로 매핑
socket_urlpatterns = [
path('socket/all/', consumers.ChatConsumer.as_asgi()),
]
최종적으로, 웹소켓 소비자 클래스를 구현합니다. 이 클래스는 연결, 메시지 수신, 전송 등의 로직을 처리합니다.
import json
from channels.generic.websocket import AsyncWebsocketConsumer
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = f'chat_{self.room_name}'
# 그룹에 참여
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()
async def disconnect(self, close_code):
# 그룹 탈퇴
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
async def receive(self, text_data):
data = json.loads(text_data)
message = data.get('message')
# 그룹 내 모든 클라이언트에 메시지 전송
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'chat_message',
'content': message
}
)
async def chat_message(self, event):
message = event['content']
# 클라이언트에게 응답 전송
await self.send(text_data=json.dumps({
'message': message
}))