Express 기반 Redis 및 메시지 큐 전문 활용 가이드

Redis와 Express를 이용한 데이터 구조 및 미들웨어 통합

Node.js 환경에서 ExpressRedis를 결합하면 고성능 캐싱, 세션 관리, 실시간 처리 시스템을 구축할 수 있습니다. 본 문서에서는 ioredis를 중심으로 다양한 Redis 데이터 타입 사용법과, 분산 락, 발행/구독 모델, 그리고 확장된 미들웨어 패턴까지 심층적으로 다룹니다.

1. 핵심 데이터 타입별 연동 예제

다음은 각 Redis 데이터 구조에 맞는 REST API 엔드포인트 설계입니다.

const express = require('express');
const redis = require('ioredis');

const client = new redis({
  host: 'localhost',
  port: 6379
});

const app = express();
app.use(express.json());

// 문자열: 간단한 상태 저장 또는 카운터
app.post('/data/string', async (req, res) => {
  const { key, value, expire } = req.body;
  await client.set(key, value, 'EX', expire || 3600);
  res.json({ status: 'saved', key });
});

app.get('/data/string/:key', async (req, res) => {
  const data = await client.get(req.params.key);
  res.json({ value: data });
});

// 해시: 객체 형태의 정보 저장 (예: 사용자 프로필)
app.post('/data/hash', async (req, res) => {
  const { entity, field, val } = req.body;
  await client.hset(entity, field, val);
  res.json({ updated: `${entity}.${field}` });
});

app.get('/data/hash/:entity', async (req, res) => {
  const data = await client.hgetall(req.params.entity);
  res.json(data);
});

// 리스트: FIFO 기반 작업 대기열 또는 최근 활동 로그
app.post('/queue/push', async (req, res) => {
  const { listName, item } = req.body;
  await client.lpush(listName, JSON.stringify(item));
  res.json({ enqueued: item });
});

app.get('/queue/pop/:name', async (req, res) => {
  const result = await client.brpop(req.params.name, 5); // 블로킹 팝
  res.json(result ? { popped: JSON.parse(result[1]) } : null);
});

// 집합(Set): 중복 방지를 위한 태그, IP 차단 목록 등
app.put('/tags/add', async (req, res) => {
  const { tagSet, tag } = req.body;
  const added = await client.sadd(tagSet, tag);
  res.json({ action: added > 0 ? 'created' : 'duplicate' });
});

app.get('/tags/all/:set', async (req, res) => {
  const tags = await client.smembers(req.params.set);
  res.json({ tags });
});

// 정렬된 집합(Sorted Set): 리더보드, 우선순위 큐
app.put('/leaderboard/rank', async (req, res) => {
  const { board, score, player } = req.body;
  await client.zadd(board, score, player);
  res.json({ ranked: player, score });
});

app.get('/leaderboard/top/:board', async (req, res) => {
  const entries = await client.zrevrange(req.params.board, 0, 9, 'WITHSCORES');
  const list = [];
  for (let i = 0; i < entries.length; i += 2) {
    list.push({ player: entries[i], score: Number(entries[i + 1]) });
  }
  res.json(list);
});

2. 고급 기능: TTL 기반 캐시 및 분산 제어

자동 만료 캐시

고정 시간 후 자동 삭제되는 캐시를 생성합니다.

app.post('/cache/temp', async (req, res) => {
  const { k, v, ttlSec = 60 } = req.body;
  await client.setex(k, ttlSec, v);
  res.send(`Cached ${k} for ${ttlSec}s`);
});

app.get('/cache/ttl/:key', async (req, res) => {
  const remaining = await client.ttl(req.params.key);
  res.json({ ttl: remaining });
});

분산 락 구현

NX(Not eXists) 옵션과 Lua 스크립트를 조합해 안전한 락 획득 및 해제를 수행합니다.

app.post('/lock/acquire', async (req, res) => {
  const { resource, token, timeout = 10 } = req.body;
  const result = await client.set(resource, token, 'NX', 'EX', timeout);
  res.json({ acquired: !!result });
});

app.post('/lock/release', async (req, res) => {
  const { resource, token } = req.body;
  const script = `
    if redis.call("GET", KEYS[1]) == ARGV[1] then
      return redis.call("DEL", KEYS[1])
    else
      return 0
    end
  `;
  const released = await client.eval(script, 1, resource, token);
  res.json({ released: Boolean(released) });
});

3. 이벤트 기반 아키텍처: Pub/Sub

실시간 알림이나 마이크로서비스 간 커뮤니케이션에 적합합니다.

// 별도의 구독 클라이언트 유지
const subscriber = new redis();

subscriber.subscribe('alerts');

subscriber.on('message', (channel, message) => {
  console.log(`🔔 [${channel}] ${message}`);
});

// HTTP 요청을 통해 메시지 발행
app.post('/notify', async (req, res) => {
  const { channel, msg } = req.body;
  await client.publish(channel, msg);
  res.send('Published');
});

4. 확장된 분산 처리 패턴

Redlock 알고리즘 기반 다중 인스턴스 락

단일 장애점 없이 분산 환경에서 안정적인 락을 제공합니다.

const Redlock = require('redlock');
const redlock = new Redlock([client], { retryCount: 3 });

app.post('/secure-action', async (req, res) => {
  try {
    const lock = await redlock.acquire(['resource:critical'], 2000);
    // 비즈니스 로직 실행
    await new Promise(r => setTimeout(r, 1000));
    await lock.release();
    res.send('Action completed safely');
  } catch (err) {
    res.status(503).send('Resource busy');
  }
});

Redis Streams: 지속성 보장 큐

메시지 유실 없이 소비자 그룹 기반 처리가 가능합니다.

const STREAM = 'jobs';
const GROUP = 'workers';

// 스트림 및 그룹 초기화
await client.xgroup('CREATE', STREAM, GROUP, '$', 'MKSTREAM').catch(() => {});

app.post('/job/submit', async (req, res) => {
  await client.xadd(STREAM, '*', 'task', JSON.stringify(req.body.task));
  res.send('Job queued');
});

// 백그라운드 소비자
async function processJobs() {
  while (true) {
    const result = await client.xreadgroup(
      'GROUP', GROUP, 'worker-1',
      'BLOCK', 5000,
      'COUNT', 1,
      'STREAMS', STREAM, '>'
    );

    if (!result) continue;

    const [[, messages]] = result;
    for (const [id, fields] of messages) {
      const taskData = JSON.parse(fields[1]);
      console.log('Processing:', taskData);
      await client.xack(STREAM, GROUP, id);
    }
  }
}
processJobs();

5. 비동기 작업 큐: Bull 활용

Bull은 Redis 기반의 강력한 큐 시스템으로, 실패 재시도, 지연 실행 등을 지원합니다.

const Queue = require('bull');
const mailQueue = new Queue('mails');

app.post('/email/queue', async (req, res) => {
  const job = await mailQueue.add(req.body, {
    attempts: 3,
    backoff: 5000,
    removeOnComplete: true
  });
  res.json({ jobId: job.id });
});

mailQueue.process(async (job) => {
  console.log(`Sending email to ${job.data.to}`);
  // 실제 이메일 전송 로직
});

6. 블룸 필터를 이용한 불필요한 조회 방지

대량의 존재하지 않는 키 조회(캐시 펀치홀)를 사전 차단합니다.

const { BloomFilter } = require('bloom-filters');

// 메모리 기반 필터 (클러스터 환경선 공유 필요)
const filter = BloomFilter.fromSize(10000, 0.01);

app.get('/user/check/:id', (req, res) => {
  const userId = req.params.id;
  if (filter.has(userId)) {
    res.json({ exists: true }); // 가능성 있음
  } else {
    filter.add(userId); // 첫 접근은 기록
    res.json({ exists: false }); // 실제로는 DB 확인 필요
  }
});

7. RabbitMQ 연동: 외부 메시지 브로커 활용

AMQP 기반의 RabbitMQ는 복잡한 라우팅과 신뢰성 있는 전달이 필요한 경우 적합합니다.

기본 큐 통신

const amqp = require('amqplib');

async function publishTask(payload) {
  const conn = await amqp.connect('amqp://localhost');
  const ch = await conn.createChannel();
  await ch.assertQueue('tasks', { durable: true });
  ch.sendToQueue('tasks', Buffer.from(JSON.stringify(payload)), { persistent: true });
  setTimeout(() => { ch.close(); conn.close(); }, 500);
}

app.post('/task/send', async (req, res) => {
  await publishTask(req.body);
  res.send('Task dispatched');
});

Topic 기반 라우팅

카테고리 기반 메시징 (예: order.created.us)

await ch.assertExchange('logs', 'topic');
ch.publish('logs', 'order.failed.payment', Buffer.from('Failed'));

DLX와 TTL을 이용한 지연 처리

만료된 메시지를 다른 교환기에 전달하여 지연 큐처럼 사용

await ch.assertQueue('delayed', {
  messageTtl: 10000,
  deadLetterExchange: 'real-tasks'
});

8. 신뢰성 있는 메시지 전달 및 멱등성 보장

Confirm 모드와 고유 메시지 ID를 조합하여 손실 없는 전달을 구현합니다.

// 생산자 측 confirm
const ch = await conn.createConfirmChannel();
ch.publish(ex, rk, buf, {}, (err) => {
  if (err) console.error('Delivery failed');
});

// 소비자 측 멱등성 처리
const processedKey = `done:${msg.properties.messageId}`;
if (await client.get(processedKey)) {
  ch.ack(msg);
  return;
}
await client.setex(processedKey, 86400, '1');
// 실제 처리 수행
ch.ack(msg);

서버 시작:

app.listen(3000, () => {
  console.log('Server running on port 3000');
});

태그: Redis ioredis Express RabbitMQ Bull

5월 30일 20:34에 게시됨