Java ThreadPoolExecutor 고성능 트레이락과 동시성 제어

Executor 프레임워크 핵심 메커니즘

ThreadPoolExecutor는 작업 제출과 실행을 분리하는 생산자-소비자 패턴을 구현한다. 5가지 핵심 구성요소는 다음과 같다.

  1. corePoolSize: 유휴 상태라도 제거되지 않는 기본 스레드 수
  2. maximumPoolSize: 부하 급증 시 확장 가능한 상한선
  3. keepAliveTime: corePoolSize 초과 생성된 스레드의 유휴 대기 시간
  4. workQueue: Runnable 객체를 임시 보관하는 버퍼 (예: LinkedBlockingDeque, PriorityBlockingQueue)
  5. handler: 수용 한도 초과 시 위임 처리 로직

팩토리 메서드별 특성 비교

메서드coremax큐 특성적합한 환경
newCachedThreadPool0Integer.MAX_VALUESynchronousQueue (0용량)짧은 지속 시간의 비동기 콜백
newFixedThreadPoolnn무한 LinkedBlockingQueue안정적인 처리율이 중요한 백엔드
newSingleThreadExecutor11무한 큐순서 보장이 필요한 이벤트 처리
newScheduledThreadPoolnInteger.MAX_VALUEDelayedWorkQueue주기적/지연 실행 작업

동적 파라미터 튜닝 전략

스레드 수 산출 공식을 업무 특성에 맞게 변형해 적용한다.

// CPU 집약형: 코어 수 + 1 (컨텍스트 스위칭 오버헤드 고려)
int cpuBound = Runtime.getRuntime().availableProcessors() + 1;

// IO 집약형: 코어 수 * (1 + 평균 대기시간 / 평균 처리시간)
int ioBound = (int)(cores * (1 + waitTime / computeTime));

// 혼합형: CPU 코어 수의 2~4배에서 시작해 프로파일링
int mixed = cores * 4;

큐 크기는 메모리와 처리량의 균형점에서 설정한다. 경험상 (목표 TPS × 허용 대기 시간) / 평균 처리 시간으로 초기값을 잡고 부하 테스트로 검증한다.

커스텀 큐 설계 패턴

기본 제공 큐로는 해결 어려운 시나리오를 위해 직접 구현할 수 있다.

public class TieredBlockingQueue<T> extends AbstractQueue<T> 
        implements BlockingQueue<T> {
    
    private final BlockingQueue<T> urgentLane;
    private final BlockingQueue<T> normalLane;
    private final AtomicInteger urgentThreshold;
    
    public TieredBlockingQueue(int urgentCap, int normalCap, int threshold) {
        this.urgentLane = new ArrayBlockingQueue<>(urgentCap);
        this.normalLane = new LinkedBlockingQueue<>(normalCap);
        this.urgentThreshold = new AtomicInteger(threshold);
    }
    
    @Override
    public boolean offer(T element) {
        if (element instanceof Prioritized && 
            ((Prioritized) element).level() > urgentThreshold.get()) {
            return urgentLane.offer(element);
        }
        return normalLane.offer(element);
    }
    
    @Override
    public T poll() {
        T item = urgentLane.poll();
        return item != null ? item : normalLane.poll();
    }
    
    // size, peek, iterator 등 생략
}

거부 정책 커스텀 구현

표준 4가지 전략 외에 비즈니스 요구에 맞는 핸들러를 정의한다.

public class BackpressurePolicy implements RejectedExecutionHandler {
    private final MeterRegistry metrics;
    
    public BackpressurePolicy(MeterRegistry metrics) {
        this.metrics = metrics;
    }
    
    @Override
    public void rejected(Runnable r, ThreadPoolExecutor executor) {
        metrics.counter("pool.rejection").increment();
        
        if (r instanceof DeferredTask) {
            // 일정 시간 후 재시도 큐로 이관
            ((DeferredTask) r).rescheduleAfter(Duration.ofMillis(100));
        } else {
            // 호출자에게 즉시 피드백
            throw new PoolExhaustedException("임계점 도달, 요청 거부됨");
        }
    }
}

고부하 장비에서의 분산 처리

대용량 데이터 처리 시 단일 풀 병목을 하기 위한 샤딩 접근법.

public class ShardedExecutor {
    private final ThreadPoolExecutor[] partitions;
    private final AtomicLong sequencer = new AtomicLong();
    
    public ShardedExecutor(int shardCount, int perShardThreads) {
        partitions = new ThreadPoolExecutor[shardCount];
        for (int i = 0; i < shardCount; i++) {
            partitions[i] = new ThreadPoolExecutor(
                perShardThreads, perShardThreads,
                0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(256),
                new ThreadFactoryBuilder().setNameFormat("shard-" + i + "-%d").build()
            );
        }
    }
    
    public Future<?> dispatch(Runnable task) {
        long idx = sequencer.getAndIncrement() & (partitions.length - 1);
        return partitions[(int) idx].submit(task);
    }
    
    public void shutdownGracefully() {
        for (ThreadPoolExecutor pool : partitions) {
            pool.shutdown();
        }
        // 생략: awaitTermination 처리
    }
}

비동기 파이프라인과 조합

CompletableFuture를 활용한 다단계 비동기 체인 구성.

public class AsyncPipeline {
    private final Executor fetchStage;
    private final Executor computeStage;
    private final Executor persistStage;
    
    public CompletableFuture<Result> process(Request req) {
        return CompletableFuture
            .supplyAsync(() -> fetchFromRemote(req), fetchStage)
            .thenApplyAsync(this::transform, computeStage)
            .thenComposeAsync(data -> 
                CompletableFuture.supplyAsync(() -> saveToStore(data), persistStage)
                    .thenApply(v -> new Result(data, v))
            );
    }
    
    private Data fetchFromRemote(Request r) { /* NIO 기반 논블로킹 IO */ return null; }
    private Data transform(Data d) { /* CPU 집약 변환 */ return d; }
    private boolean saveToStore(Data d) { /* DB 기록 */ return true; }
}

운영 환경 사례: 대규모 주문 처리

문제 상황: 트래픽 급증 시 Executors.newCachedThreadPool 사용으로 스레드 20,000개 생성, 시스템 교착.

개선 구성:

ThreadPoolExecutor orderProcessor = new ThreadPoolExecutor(
    200, 400,
    60L, TimeUnit.SECONDS,
    new LinkedTransferQueue<>() {
        @Override
        public boolean offer(Runnable e) {
            // 거부 없이 호출자 블로킹 유도
            try {
                transfer(e);
                return true;
            } catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
    },
    new CustomThreadFactory("order-pool"),
    new BackpressurePolicy(meterRegistry)
);

부가 조치:

  • Redis 캐싱 계층 도입으로 DB 직접 접근 70% 감소
  • Kafka 버퍼링으로 급격한 트래픽 스파이크 흡수
  • Prometheus 기반 queueSize / activeCount / completedTasks 모니터링

결과: 피크 스레드 수 450개로 안정화, P99 지연 2.3초 → 180ms.

잠재적 위험 요소와 대응

증상원인해결책
메모리 부족 (OOM)무제한 큐에 작업 무한 누적ArrayBlockingQueue + 적절한 capacity
스레드 기아긴 작업壟占 공정 큐별도 풀 분리 또는 우선순위 큐
교착 상태작업 내부에서 외부 동기화 블록 호출비동기 백 패턴으로 대체
리소스 누출풀 미종료로 스레드 잔존ShutdownHook 또는 Spring DisposableBean
디버깅 곤란기본 스레드명 (pool-N-thread-M)의미 있는 ThreadFactory 구현

고급 최적화 기법

스핀 기반 대기: 짧은 임계구간에서 락 프리 구조로 전환.

public class SpinningBackoffQueue<T> extends LinkedTransferQueue<T> {
    private static final int SPIN_TRIES = 100;
    
    @Override
    public T poll() {
        T item;
        for (int i = 0; i < SPIN_TRIES; i++) {
            if ((item = super.poll()) != null) return item;
            Thread.onSpinWait(); // Java 9+
        }
        return super.poll(); // 최종 fallback
    }
}

CPU 친화도 바인딩: NUMA 환경에서 캐시 효율 극대화.

public void bindToCore(Thread t, int coreId) {
    // Linux-specific: sched_setaffinity 시스템 콜
    // Java 21 가상 스레드에서는 플랫폼 스레드 레벨에서 적용
    ProcessHandle.current().info().command().ifPresent(cmd -> {
        // jnr-ffi 또는 ProcessBuilder로 native 호출
    });
}

워크스틸링 큐: ForkJoinPool의 기본 전략을 커스텀 풀에 적용.

public class WorkStealingPool {
    private final ForkJoinPool adaptivePool = new ForkJoinPool(
        Runtime.getRuntime().availableProcessors(),
        ForkJoinPool.defaultForkJoinWorkerThreadFactory,
        null, true // asyncMode: FIFO for event-style tasks
    );
    
    public void submitBatch(List<Task> tasks) {
        adaptivePool.invoke(new RecursiveAction() {
            @Override
            protected void compute() {
                tasks.parallelStream().forEach(Task::execute);
            }
        });
    }
}

관측 도구 및 방법론

도구활용 포인트
async-profilerCPU 샘플링 + lock contention 동시 분석
JFR (JDK Flight Recorder)스레드 블로킹 이벤트, park/parkNanos 상세 추적
Micrometer + Prometheusexecutor_pool_size, executor_queue_remaining 커스텀 메트릭
JMH큐 구현체별 throughput/latency 벤치마크
Arthas thread실시간 스레드 상태 분포, 블로킹 위치 식별

성능 개선은 단일 파라미터 조정이 아닌, 큐 전략-거부 정책-모니터링-비즈니스 아키텍처의 연속된 설계 과정에서 나온다. 실제 적용 시에는 점진적 배포와 지표 기반 검증을 병행할 것.

태그: ThreadPoolExecutor CompletableFuture ForkJoinPool BlockingQueue RejectedExecutionHandler

7월 2일 16:54에 게시됨