분산 시스템 아키텍처를 위한 Redis의 5가지 핵심 활용 패턴
분산 시스템 환경에서 데이터의 일관성, 가용성, 그리고 성능을 동시에 보장하는 것은 매우 복잡한 과제입니다. Redis는 단순한 인메모리 키-값 저장소를 넘어, 다양한 데이터 구조와 기능을 제공함으로써 분산 아키텍처의 핵심 인프라로 자리 잡았습니다. 본 글에서는 Redis를 활용하여 분산 시스템의 주요 문제를 해결하는 5가지 핵심 패턴과 그 구현 방식을 살펴봅니다.
1. 캐싱(Caching) 레이어 구현
데이터베이스의 부하를 줄이고 API 응답 속도를 향상시키기 위해 Redis를 캐싱 레이어로 사용하는 것은 가장 기본적이면서도 강력한 패턴입니다. Cache-Aside 패턴을 적용하면, 애플리케이션은 먼저 Redis에서 데이터를 조회하고, 캐시 미스(Cache Miss) 발생 시에만 데이터베이스에 질의한 후 결과를 캐시에 저장합니다.
import redis
import json
class CacheManager:
def __init__(self, host='localhost', port=6379, db=0):
self.client = redis.StrictRedis(host=host, port=port, db=db, decode_responses=True)
def fetch_user_profile(self, user_id):
cache_key = f"user:profile:{user_id}"
cached_data = self.client.get(cache_key)
if cached_data:
return json.loads(cached_data)
# 데이터베이스 조회 시뮬레이션
db_data = {"id": user_id, "name": "Alice", "role": "admin"}
# 캐시에 데이터 저장 및 TTL(3600초) 설정
self.client.setex(cache_key, 3600, json.dumps(db_data))
return db_data
2. Redis Streams를 활용한 비동기 메시지 큐
耗时이 긴 작업을 동기적으로 처리하면 시스템의 전체적인 처리량이 저하됩니다. Redis Streams는 로그 기반의 메시지 큐로, 생산자와 소비자를 분리하여 비동기 처리를 가능하게 합니다. Consumer Group 기능을 통해 여러 워커가 메시지를 병렬로 안전하게 소비할 수 있습니다.
import redis
class TaskQueue:
def __init__(self):
self.client = redis.StrictRedis(decode_responses=True)
self.stream_name = "task_processing_stream"
self.group_name = "worker_group"
try:
self.client.xgroup_create(self.stream_name, self.group_name, id='0', mkstream=True)
except redis.exceptions.ResponseError:
pass # 이미 그룹이 존재하는 경우 무시
def enqueue_task(self, task_payload):
self.client.xadd(self.stream_name, task_payload)
def consume_tasks(self, consumer_name):
while True:
messages = self.client.xreadgroup(
groupname=self.group_name,
consumername=consumer_name,
streams={self.stream_name: '>'},
count=1,
block=2000
)
if messages:
for stream, msg_list in messages:
for msg_id, msg_data in msg_list:
print(f"Processing {msg_data} (ID: {msg_id})")
# 작업 완료 후 ACK 전송
self.client.xack(self.stream_name, self.group_name, msg_id)
3. 동시성 제어를 위한 분산 락(Distributed Lock)
여러 노드가 동일한 공유 리소스에 접근할 때 데이터 정합성을 깨뜨리지 않기 위해서는 분산 락이 필요합니다. Redis의 SET 명령어에 NX(Not eXists)와 EX(Expire) 옵션을 결합하여 락을 획득하며, Lua 스크립트를 사용하여 락 해제 과정의 원자성을 보장합니다.
import redis
import uuid
class DistributedLock:
def __init__(self):
self.client = redis.StrictRedis(decode_responses=True)
# Lua 스크립트를 사용하여 락 해제 시 원자성 보장 (자신이 획득한 락만 해제)
self.unlock_script = self.client.register_script("""
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
""")
def acquire(self, lock_name, timeout=10):
token = str(uuid.uuid4())
if self.client.set(f"lock:{lock_name}", token, nx=True, ex=timeout):
return token
return None
def release(self, lock_name, token):
return self.unlock_script(keys=[f"lock:{lock_name}"], args=[token])
4. 무상태(Stateless) 서버를 위한 세션 저장소
웹 서버에 세션 데이터를 로컬 메모리에 저장하면 서버의 수평적 확장(Scale-out)이 어려워집니다. Redis를 중앙 세션 저장소로 활용하면 웹 서버를 완전히 무상태(Stateless)로 만들 수 있으며, 로드 밸런서를 통한 유연한 트래픽 분산이 가능해집니다.
import redis
class SessionStore:
def __init__(self):
self.client = redis.StrictRedis(decode_responses=True)
def save_session(self, session_id, session_data, ttl_seconds=1800):
key = f"session:{session_id}"
self.client.hset(key, mapping=session_data)
self.client.expire(key, ttl_seconds)
def get_session(self, session_id):
key = f"session:{session_id}"
data = self.client.hgetall(key)
if data:
# 사용자가 활발히 활동할 경우 세션 만료 시간 갱신
self.client.expire(key, 1800)
return data
5. 시스템 안정성을 위한 속도 제한(Rate Limiting)
특정 클라이언트나 IP에서 발생하는 과도한 API 요청은 서비스 장애를 유발할 수 있습니다. Redis의 INCR 명령어와 만료 시간(TTL)을 조합하면 고정 창(Fixed Window) 방식의 속도 제한기를 효율적으로 구현할 수 있습니다. 파이프라인(Pipeline)을 사용하여 네트워크 왕복 시간을 줄이고 원자성을 높입니다.
import redis
class RateLimiter:
def __init__(self):
self.client = redis.StrictRedis(decode_responses=True)
def is_request_allowed(self, client_ip, max_requests, window_seconds):
key = f"ratelimit:{client_ip}"
pipe = self.client.pipeline()
pipe.incr(key)
pipe.expire(key, window_seconds)
current_count, _ = pipe.execute()
return current_count <= max_requests