Ant Design와 FastAPI로 실시간 시스템 명령 실행 및 로그 스트리밍 구현

구현 목표 및 기술 선정

웹 인터페이스에서 서버 측 명령을 실행하고 출력 결과를 실시간으로 확인하는 기능을 구축한다. 프론트엔드는 Ant Design의 컴포넌트 체계를 활용해 명령 입력과 로그 표시 영역을 구성하고, 백엔드는 FastAPI의 비동기 구조를 바탕으로 WebSocket을 통한 스트리밍 응답을 처리한다.

프론트엔드 구성

명령 입력창, 실행 버튼, 그리고 스크롤 가능한 로그 뷰어를 조합한다. WebSocket 연결을 통해 서버가 푸시하는 메시지를 수신하여 상태에 누적한다.

import React, { useState, useRef, useEffect } from 'react';
import { Input, Button, Card, Space, Typography } from 'antd';
import { PlayCircleOutlined, ClearOutlined } from '@ant-design/icons';

const { TextArea } = Input;
const { Text } = Typography;

function RemoteExecutor() {
  const [instruction, setInstruction] = useState('');
  const [outputLines, setOutputLines] = useState([]);
  const [isProcessing, setIsProcessing] = useState(false);
  const scrollRef = useRef(null);
  const socketRef = useRef(null);

  useEffect(() => {
    const ws = new WebSocket('ws://localhost:8000/stream/execute');
    socketRef.current = ws;

    ws.onmessage = (event) => {
      const payload = JSON.parse(event.data);
      if (payload.type === 'chunk') {
        setOutputLines(prev => [...prev, payload.content]);
      } else if (payload.type === 'complete') {
        setIsProcessing(false);
      }
    };

    return () => ws.close();
  }, []);

  useEffect(() => {
    if (scrollRef.current) {
      scrollRef.current.scrollTop = scrollRef.current.scrollHeight;
    }
  }, [outputLines]);

  const dispatchInstruction = () => {
    if (!instruction.trim() || !socketRef.current) return;
    
    setIsProcessing(true);
    setOutputLines([]);
    socketRef.current.send(JSON.stringify({ 
      operation: instruction.trim() 
    }));
  };

  const wipeHistory = () => setOutputLines([]);

  return (
    <Card title="원격 명령 실행기" style={{ width: 700, margin: '40px auto' }}>
      <Space direction="vertical" style={{ width: '100%' }} size="middle">
        <Input
          placeholder="실행할 명령을 입력하세요 (예: pwd, whoami)"
          value={instruction}
          onChange={(e) => setInstruction(e.target.value)}
          disabled={isProcessing}
          onPressEnter={dispatchInstruction}
        />
        <Space>
          <Button 
            type="primary" 
            icon={<PlayCircleOutlined />}
            onClick={dispatchInstruction}
            loading={isProcessing}
          >
            실행
          </Button>
          <Button 
            icon={<ClearOutlined />} 
            onClick={wipeHistory}
          >
            로그 지우기
          </Button>
        </Space>
        <div 
          ref={scrollRef}
          style={{ 
            height: 320, 
            overflowY: 'auto',
            background: '#f6ffed',
            padding: 12,
            borderRadius: 6,
            fontFamily: 'monospace',
            fontSize: 13
          }}
        >
          {outputLines.map((line, idx) => (
            <div key={idx}>
              <Text code>{line}</Text>
            </div>
          ))}
          {outputLines.length === 0 && (
            <Text type="secondary">실행 로그가 여기에 표시됩니다</Text>
          )}
        </div>
      </Space>
    </Card>
  );
}

export default RemoteExecutor;

백엔드 구현

FastAPI에서 WebSocket 엔드포인트를 정의하고, 수신한 명령을 검증한 뒤 asyncio 서브프로세스로 실행한다. 표준 출력과 에러 스트림을 비동기로 읽어 클라이언트에 전송한다.

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
import asyncio
import shlex
from typing import Set

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 허용된 명령어 기반 접두사
PERMITTED_PREFIXES = {"ls", "cat", "pwd", "whoami", "ps", "df", "free", "uptime"}
MAX_OUTPUT_LENGTH = 10000

class ConnectionPool:
    def __init__(self):
        self.active: Set[WebSocket] = set()
    
    async def attach(self, ws: WebSocket):
        await ws.accept()
        self.active.add(ws)
    
    def detach(self, ws: WebSocket):
        self.active.discard(ws)

pool = ConnectionPool()

def sanitize_operation(raw_cmd: str) -> tuple[bool, str]:
    """명령어 검증 및 정제"""
    parts = shlex.split(raw_cmd.strip())
    if not parts:
        return False, "빈 명령어"
    
    base = parts[0]
    if base not in PERMITTED_PREFIXES:
        return False, f"'{base}' 명령은 허용되지 않습니다"
    
    # 위험한 문자 필터링
    dangerous = {';', '&&', '||', '|', '`', '$', '(', ')'}
    if any(c in raw_cmd for c in dangerous):
        return False, "특수 문자가 포함된 명령은 실행할 수 없습니다"
    
    return True, raw_cmd

@app.websocket("/stream/execute")
async def handle_stream(websocket: WebSocket):
    await pool.attach(websocket)
    
    try:
        while True:
            msg = await websocket.receive_json()
            operation = msg.get("operation", "")
            
            is_valid, result = sanitize_operation(operation)
            if not is_valid:
                await websocket.send_json({
                    "type": "error",
                    "content": result
                })
                continue
            
            await websocket.send_json({
                "type": "start",
                "content": f"$ {operation}"
            })
            
            # 서브프로세스 생성
            proc = await asyncio.create_subprocess_exec(
                *shlex.split(result),
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE
            )
            
            # 출력 스트리밍
            async def relay_stream(pipe, label):
                accumulated = 0
                while True:
                    try:
                        chunk = await asyncio.wait_for(
                            pipe.readline(), timeout=30.0
                        )
                        if not chunk:
                            break
                        decoded = chunk.decode('utf-8', errors='replace').rstrip()
                        accumulated += len(decoded)
                        if accumulated > MAX_OUTPUT_LENGTH:
                            await websocket.send_json({
                                "type": "warning",
                                "content": "출력 길이 제한 초과, 이후 내용 생략"
                            })
                            break
                        await websocket.send_json({
                            "type": "chunk",
                            "content": f"[{label}] {decoded}"
                        })
                    except asyncio.TimeoutError:
                        await websocket.send_json({
                            "type": "error",
                            "content": f"{label} 스트림 타임아웃"
                        })
                        break
            
            # stdout, stderr 동시 처리
            await asyncio.gather(
                relay_stream(proc.stdout, "OUT"),
                relay_stream(proc.stderr, "ERR")
            )
            
            await proc.wait()
            await websocket.send_json({
                "type": "complete",
                "content": f"종료 코드: {proc.returncode}"
            })
            
    except WebSocketDisconnect:
        pool.detach(websocket)
    except Exception as e:
        await websocket.send_json({
            "type": "error",
            "content": f"서버 오류: {str(e)}"
        })
        pool.detach(websocket)

보안 강화 방안

단순한 문자열 비교를 넘어 다층적 검증을 적용한다. shlex로 파싱하여 첫 토큰만 명령어로 인식하고, 셸 메타문자를 사전에 차단한다. 추가로 실행 환경을 격리하는 방식도 고려할 수 있다.

import subprocess
from pathlib import Path

async def run_in_sandbox(cmd: str, work_dir: Path):
    """제한된 환경에서 명령 실행"""
    return await asyncio.create_subprocess_exec(
        *shlex.split(cmd),
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        cwd=str(work_dir),
        env={"PATH": "/usr/bin:/bin"},  # 최소 환경 변수
        user="nobody"  # 권한 하락 (Linux)
    )

성능 최적화

대용량 로그 처리를 위해 백프레셔(backpressure) 메커니즘을 도입한다. 클라이언트 처리 속도를 고려하여 버퍼링 후 배치 전송하거나, 출력 압축을 적용할 수 있다.

import zlib
from collections import deque

class BufferedStreamer:
    def __init__(self, websocket: WebSocket, flush_interval: float = 0.1):
        self.ws = websocket
        self.buffer = deque()
        self.last_flush = asyncio.get_event_loop().time()
        self.interval = flush_interval
    
    async def append(self, data: str):
        self.buffer.append(data)
        now = asyncio.get_event_loop().time()
        if now - self.last_flush >= self.interval or len(self.buffer) >= 50:
            await self._flush()
    
    async def _flush(self):
        if not self.buffer:
            return
        batch = "\n".join(self.buffer)
        compressed = zlib.compress(batch.encode(), level=6)
        await self.ws.send_bytes(compressed)
        self.buffer.clear()
        self.last_flush = asyncio.get_event_loop().time()

확장: 배치 작업 모니터링

장시간 실행되는 작업의 경우 진행 상황을 추적하는 구조를 추가한다. 작업 ID를 발급하고 상태 조회 엔드포인트를 별도 제공한다.

from dataclasses import dataclass, field
from datetime import datetime
import uuid

@dataclass
class BackgroundTask:
    task_id: str
    command: str
    status: str = "pending"  # pending, running, completed, failed
    logs: list = field(default_factory=list)
    started_at: datetime = field(default_factory=datetime.now)
    finished_at: datetime = None

task_registry: dict[str, BackgroundTask] = {}

@app.post("/tasks")
async def enqueue_task(cmd: str):
    task = BackgroundTask(
        task_id=str(uuid.uuid4())[:8],
        command=cmd
    )
    task_registry[task.task_id] = task
    
    # 백그라운드 실행
    asyncio.create_task(execute_background(task))
    return {"task_id": task.task_id, "status": "queued"}

@app.get("/tasks/{task_id}")
async def query_progress(task_id: str):
    if task_id not in task_registry:
        return {"error": "작업을 찾을 수 없습니다"}
    task = task_registry[task_id]
    return {
        "command": task.command,
        "status": task.status,
        "log_count": len(task.logs),
        "recent_logs": task.logs[-20:],
        "duration": (task.finished_at or datetime.now()) - task.started_at
    }

async def execute_background(task: BackgroundTask):
    task.status = "running"
    proc = await asyncio.create_subprocess_shell(
        task.command,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.STDOUT
    )
    async for line in proc.stdout:
        task.logs.append(line.decode().strip())
    await proc.wait()
    task.status = "completed" if proc.returncode == 0 else "failed"
    task.finished_at = datetime.now()

태그: Ant Design FastAPI websocket asyncio React

5월 30일 05:39에 게시됨