Redis와 Express를 이용한 데이터 구조 및 미들웨어 통합
Node.js 환경에서 Express와 Redis를 결합하면 고성능 캐싱, 세션 관리, 실시간 처리 시스템을 구축할 수 있습니다. 본 문서에서는 ioredis를 중심으로 다양한 Redis 데이터 타입 사용법과, 분산 락, 발행/구독 모델, 그리고 확장된 미들웨어 패턴까지 심층적으로 다룹니다.
1. 핵심 데이터 타입별 연동 예제
다음은 각 Redis 데이터 구조에 맞는 REST API 엔드포인트 설계입니다.
const express = require('express');
const redis = require('ioredis');
const client = new redis({
host: 'localhost',
port: 6379
});
const app = express();
app.use(express.json());
// 문자열: 간단한 상태 저장 또는 카운터
app.post('/data/string', async (req, res) => {
const { key, value, expire } = req.body;
await client.set(key, value, 'EX', expire || 3600);
res.json({ status: 'saved', key });
});
app.get('/data/string/:key', async (req, res) => {
const data = await client.get(req.params.key);
res.json({ value: data });
});
// 해시: 객체 형태의 정보 저장 (예: 사용자 프로필)
app.post('/data/hash', async (req, res) => {
const { entity, field, val } = req.body;
await client.hset(entity, field, val);
res.json({ updated: `${entity}.${field}` });
});
app.get('/data/hash/:entity', async (req, res) => {
const data = await client.hgetall(req.params.entity);
res.json(data);
});
// 리스트: FIFO 기반 작업 대기열 또는 최근 활동 로그
app.post('/queue/push', async (req, res) => {
const { listName, item } = req.body;
await client.lpush(listName, JSON.stringify(item));
res.json({ enqueued: item });
});
app.get('/queue/pop/:name', async (req, res) => {
const result = await client.brpop(req.params.name, 5); // 블로킹 팝
res.json(result ? { popped: JSON.parse(result[1]) } : null);
});
// 집합(Set): 중복 방지를 위한 태그, IP 차단 목록 등
app.put('/tags/add', async (req, res) => {
const { tagSet, tag } = req.body;
const added = await client.sadd(tagSet, tag);
res.json({ action: added > 0 ? 'created' : 'duplicate' });
});
app.get('/tags/all/:set', async (req, res) => {
const tags = await client.smembers(req.params.set);
res.json({ tags });
});
// 정렬된 집합(Sorted Set): 리더보드, 우선순위 큐
app.put('/leaderboard/rank', async (req, res) => {
const { board, score, player } = req.body;
await client.zadd(board, score, player);
res.json({ ranked: player, score });
});
app.get('/leaderboard/top/:board', async (req, res) => {
const entries = await client.zrevrange(req.params.board, 0, 9, 'WITHSCORES');
const list = [];
for (let i = 0; i < entries.length; i += 2) {
list.push({ player: entries[i], score: Number(entries[i + 1]) });
}
res.json(list);
});
2. 고급 기능: TTL 기반 캐시 및 분산 제어
자동 만료 캐시
고정 시간 후 자동 삭제되는 캐시를 생성합니다.
app.post('/cache/temp', async (req, res) => {
const { k, v, ttlSec = 60 } = req.body;
await client.setex(k, ttlSec, v);
res.send(`Cached ${k} for ${ttlSec}s`);
});
app.get('/cache/ttl/:key', async (req, res) => {
const remaining = await client.ttl(req.params.key);
res.json({ ttl: remaining });
});
분산 락 구현
NX(Not eXists) 옵션과 Lua 스크립트를 조합해 안전한 락 획득 및 해제를 수행합니다.
app.post('/lock/acquire', async (req, res) => {
const { resource, token, timeout = 10 } = req.body;
const result = await client.set(resource, token, 'NX', 'EX', timeout);
res.json({ acquired: !!result });
});
app.post('/lock/release', async (req, res) => {
const { resource, token } = req.body;
const script = `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`;
const released = await client.eval(script, 1, resource, token);
res.json({ released: Boolean(released) });
});
3. 이벤트 기반 아키텍처: Pub/Sub
실시간 알림이나 마이크로서비스 간 커뮤니케이션에 적합합니다.
// 별도의 구독 클라이언트 유지
const subscriber = new redis();
subscriber.subscribe('alerts');
subscriber.on('message', (channel, message) => {
console.log(`🔔 [${channel}] ${message}`);
});
// HTTP 요청을 통해 메시지 발행
app.post('/notify', async (req, res) => {
const { channel, msg } = req.body;
await client.publish(channel, msg);
res.send('Published');
});
4. 확장된 분산 처리 패턴
Redlock 알고리즘 기반 다중 인스턴스 락
단일 장애점 없이 분산 환경에서 안정적인 락을 제공합니다.
const Redlock = require('redlock');
const redlock = new Redlock([client], { retryCount: 3 });
app.post('/secure-action', async (req, res) => {
try {
const lock = await redlock.acquire(['resource:critical'], 2000);
// 비즈니스 로직 실행
await new Promise(r => setTimeout(r, 1000));
await lock.release();
res.send('Action completed safely');
} catch (err) {
res.status(503).send('Resource busy');
}
});
Redis Streams: 지속성 보장 큐
메시지 유실 없이 소비자 그룹 기반 처리가 가능합니다.
const STREAM = 'jobs';
const GROUP = 'workers';
// 스트림 및 그룹 초기화
await client.xgroup('CREATE', STREAM, GROUP, '$', 'MKSTREAM').catch(() => {});
app.post('/job/submit', async (req, res) => {
await client.xadd(STREAM, '*', 'task', JSON.stringify(req.body.task));
res.send('Job queued');
});
// 백그라운드 소비자
async function processJobs() {
while (true) {
const result = await client.xreadgroup(
'GROUP', GROUP, 'worker-1',
'BLOCK', 5000,
'COUNT', 1,
'STREAMS', STREAM, '>'
);
if (!result) continue;
const [[, messages]] = result;
for (const [id, fields] of messages) {
const taskData = JSON.parse(fields[1]);
console.log('Processing:', taskData);
await client.xack(STREAM, GROUP, id);
}
}
}
processJobs();
5. 비동기 작업 큐: Bull 활용
Bull은 Redis 기반의 강력한 큐 시스템으로, 실패 재시도, 지연 실행 등을 지원합니다.
const Queue = require('bull');
const mailQueue = new Queue('mails');
app.post('/email/queue', async (req, res) => {
const job = await mailQueue.add(req.body, {
attempts: 3,
backoff: 5000,
removeOnComplete: true
});
res.json({ jobId: job.id });
});
mailQueue.process(async (job) => {
console.log(`Sending email to ${job.data.to}`);
// 실제 이메일 전송 로직
});
6. 블룸 필터를 이용한 불필요한 조회 방지
대량의 존재하지 않는 키 조회(캐시 펀치홀)를 사전 차단합니다.
const { BloomFilter } = require('bloom-filters');
// 메모리 기반 필터 (클러스터 환경선 공유 필요)
const filter = BloomFilter.fromSize(10000, 0.01);
app.get('/user/check/:id', (req, res) => {
const userId = req.params.id;
if (filter.has(userId)) {
res.json({ exists: true }); // 가능성 있음
} else {
filter.add(userId); // 첫 접근은 기록
res.json({ exists: false }); // 실제로는 DB 확인 필요
}
});
7. RabbitMQ 연동: 외부 메시지 브로커 활용
AMQP 기반의 RabbitMQ는 복잡한 라우팅과 신뢰성 있는 전달이 필요한 경우 적합합니다.
기본 큐 통신
const amqp = require('amqplib');
async function publishTask(payload) {
const conn = await amqp.connect('amqp://localhost');
const ch = await conn.createChannel();
await ch.assertQueue('tasks', { durable: true });
ch.sendToQueue('tasks', Buffer.from(JSON.stringify(payload)), { persistent: true });
setTimeout(() => { ch.close(); conn.close(); }, 500);
}
app.post('/task/send', async (req, res) => {
await publishTask(req.body);
res.send('Task dispatched');
});
Topic 기반 라우팅
카테고리 기반 메시징 (예: order.created.us)
await ch.assertExchange('logs', 'topic');
ch.publish('logs', 'order.failed.payment', Buffer.from('Failed'));
DLX와 TTL을 이용한 지연 처리
만료된 메시지를 다른 교환기에 전달하여 지연 큐처럼 사용
await ch.assertQueue('delayed', {
messageTtl: 10000,
deadLetterExchange: 'real-tasks'
});
8. 신뢰성 있는 메시지 전달 및 멱등성 보장
Confirm 모드와 고유 메시지 ID를 조합하여 손실 없는 전달을 구현합니다.
// 생산자 측 confirm
const ch = await conn.createConfirmChannel();
ch.publish(ex, rk, buf, {}, (err) => {
if (err) console.error('Delivery failed');
});
// 소비자 측 멱등성 처리
const processedKey = `done:${msg.properties.messageId}`;
if (await client.get(processedKey)) {
ch.ack(msg);
return;
}
await client.setex(processedKey, 86400, '1');
// 실제 처리 수행
ch.ack(msg);
서버 시작:
app.listen(3000, () => {
console.log('Server running on port 3000');
});