분산 시스템 아키텍처를 위한 Redis의 5가지 핵심 활용 패턴

분산 시스템 아키텍처를 위한 Redis의 5가지 핵심 활용 패턴

분산 시스템 환경에서 데이터의 일관성, 가용성, 그리고 성능을 동시에 보장하는 것은 매우 복잡한 과제입니다. Redis는 단순한 인메모리 키-값 저장소를 넘어, 다양한 데이터 구조와 기능을 제공함으로써 분산 아키텍처의 핵심 인프라로 자리 잡았습니다. 본 글에서는 Redis를 활용하여 분산 시스템의 주요 문제를 해결하는 5가지 핵심 패턴과 그 구현 방식을 살펴봅니다.

1. 캐싱(Caching) 레이어 구현

데이터베이스의 부하를 줄이고 API 응답 속도를 향상시키기 위해 Redis를 캐싱 레이어로 사용하는 것은 가장 기본적이면서도 강력한 패턴입니다. Cache-Aside 패턴을 적용하면, 애플리케이션은 먼저 Redis에서 데이터를 조회하고, 캐시 미스(Cache Miss) 발생 시에만 데이터베이스에 질의한 후 결과를 캐시에 저장합니다.

import redis
import json

class CacheManager:
    def __init__(self, host='localhost', port=6379, db=0):
        self.client = redis.StrictRedis(host=host, port=port, db=db, decode_responses=True)

    def fetch_user_profile(self, user_id):
        cache_key = f"user:profile:{user_id}"
        cached_data = self.client.get(cache_key)
        
        if cached_data:
            return json.loads(cached_data)
            
        # 데이터베이스 조회 시뮬레이션
        db_data = {"id": user_id, "name": "Alice", "role": "admin"}
        
        # 캐시에 데이터 저장 및 TTL(3600초) 설정
        self.client.setex(cache_key, 3600, json.dumps(db_data))
        return db_data

2. Redis Streams를 활용한 비동기 메시지 큐

耗时이 긴 작업을 동기적으로 처리하면 시스템의 전체적인 처리량이 저하됩니다. Redis Streams는 로그 기반의 메시지 큐로, 생산자와 소비자를 분리하여 비동기 처리를 가능하게 합니다. Consumer Group 기능을 통해 여러 워커가 메시지를 병렬로 안전하게 소비할 수 있습니다.

import redis

class TaskQueue:
    def __init__(self):
        self.client = redis.StrictRedis(decode_responses=True)
        self.stream_name = "task_processing_stream"
        self.group_name = "worker_group"
        
        try:
            self.client.xgroup_create(self.stream_name, self.group_name, id='0', mkstream=True)
        except redis.exceptions.ResponseError:
            pass  # 이미 그룹이 존재하는 경우 무시

    def enqueue_task(self, task_payload):
        self.client.xadd(self.stream_name, task_payload)

    def consume_tasks(self, consumer_name):
        while True:
            messages = self.client.xreadgroup(
                groupname=self.group_name, 
                consumername=consumer_name, 
                streams={self.stream_name: '>'}, 
                count=1, 
                block=2000
            )
            if messages:
                for stream, msg_list in messages:
                    for msg_id, msg_data in msg_list:
                        print(f"Processing {msg_data} (ID: {msg_id})")
                        # 작업 완료 후 ACK 전송
                        self.client.xack(self.stream_name, self.group_name, msg_id)

3. 동시성 제어를 위한 분산 락(Distributed Lock)

여러 노드가 동일한 공유 리소스에 접근할 때 데이터 정합성을 깨뜨리지 않기 위해서는 분산 락이 필요합니다. Redis의 SET 명령어에 NX(Not eXists)와 EX(Expire) 옵션을 결합하여 락을 획득하며, Lua 스크립트를 사용하여 락 해제 과정의 원자성을 보장합니다.

import redis
import uuid

class DistributedLock:
    def __init__(self):
        self.client = redis.StrictRedis(decode_responses=True)
        # Lua 스크립트를 사용하여 락 해제 시 원자성 보장 (자신이 획득한 락만 해제)
        self.unlock_script = self.client.register_script("""
            if redis.call("get", KEYS[1]) == ARGV[1] then
                return redis.call("del", KEYS[1])
            else
                return 0
            end
        """)

    def acquire(self, lock_name, timeout=10):
        token = str(uuid.uuid4())
        if self.client.set(f"lock:{lock_name}", token, nx=True, ex=timeout):
            return token
        return None

    def release(self, lock_name, token):
        return self.unlock_script(keys=[f"lock:{lock_name}"], args=[token])

4. 무상태(Stateless) 서버를 위한 세션 저장소

웹 서버에 세션 데이터를 로컬 메모리에 저장하면 서버의 수평적 확장(Scale-out)이 어려워집니다. Redis를 중앙 세션 저장소로 활용하면 웹 서버를 완전히 무상태(Stateless)로 만들 수 있으며, 로드 밸런서를 통한 유연한 트래픽 분산이 가능해집니다.

import redis

class SessionStore:
    def __init__(self):
        self.client = redis.StrictRedis(decode_responses=True)

    def save_session(self, session_id, session_data, ttl_seconds=1800):
        key = f"session:{session_id}"
        self.client.hset(key, mapping=session_data)
        self.client.expire(key, ttl_seconds)

    def get_session(self, session_id):
        key = f"session:{session_id}"
        data = self.client.hgetall(key)
        if data:
            # 사용자가 활발히 활동할 경우 세션 만료 시간 갱신
            self.client.expire(key, 1800) 
        return data

5. 시스템 안정성을 위한 속도 제한(Rate Limiting)

특정 클라이언트나 IP에서 발생하는 과도한 API 요청은 서비스 장애를 유발할 수 있습니다. Redis의 INCR 명령어와 만료 시간(TTL)을 조합하면 고정 창(Fixed Window) 방식의 속도 제한기를 효율적으로 구현할 수 있습니다. 파이프라인(Pipeline)을 사용하여 네트워크 왕복 시간을 줄이고 원자성을 높입니다.

import redis

class RateLimiter:
    def __init__(self):
        self.client = redis.StrictRedis(decode_responses=True)

    def is_request_allowed(self, client_ip, max_requests, window_seconds):
        key = f"ratelimit:{client_ip}"
        pipe = self.client.pipeline()
        pipe.incr(key)
        pipe.expire(key, window_seconds)
        current_count, _ = pipe.execute()
        
        return current_count <= max_requests

태그: Redis DistributedSystems Caching RedisStreams DistributedLock

6월 7일 23:11에 게시됨