Executor 프레임워크 핵심 메커니즘
ThreadPoolExecutor는 작업 제출과 실행을 분리하는 생산자-소비자 패턴을 구현한다. 5가지 핵심 구성요소는 다음과 같다.
- corePoolSize: 유휴 상태라도 제거되지 않는 기본 스레드 수
- maximumPoolSize: 부하 급증 시 확장 가능한 상한선
- keepAliveTime: corePoolSize 초과 생성된 스레드의 유휴 대기 시간
- workQueue: Runnable 객체를 임시 보관하는 버퍼 (예: LinkedBlockingDeque, PriorityBlockingQueue)
- handler: 수용 한도 초과 시 위임 처리 로직
팩토리 메서드별 특성 비교
| 메서드 | core | max | 큐 특성 | 적합한 환경 |
|---|---|---|---|---|
| newCachedThreadPool | 0 | Integer.MAX_VALUE | SynchronousQueue (0용량) | 짧은 지속 시간의 비동기 콜백 |
| newFixedThreadPool | n | n | 무한 LinkedBlockingQueue | 안정적인 처리율이 중요한 백엔드 |
| newSingleThreadExecutor | 1 | 1 | 무한 큐 | 순서 보장이 필요한 이벤트 처리 |
| newScheduledThreadPool | n | Integer.MAX_VALUE | DelayedWorkQueue | 주기적/지연 실행 작업 |
동적 파라미터 튜닝 전략
스레드 수 산출 공식을 업무 특성에 맞게 변형해 적용한다.
// CPU 집약형: 코어 수 + 1 (컨텍스트 스위칭 오버헤드 고려)
int cpuBound = Runtime.getRuntime().availableProcessors() + 1;
// IO 집약형: 코어 수 * (1 + 평균 대기시간 / 평균 처리시간)
int ioBound = (int)(cores * (1 + waitTime / computeTime));
// 혼합형: CPU 코어 수의 2~4배에서 시작해 프로파일링
int mixed = cores * 4;
큐 크기는 메모리와 처리량의 균형점에서 설정한다. 경험상 (목표 TPS × 허용 대기 시간) / 평균 처리 시간으로 초기값을 잡고 부하 테스트로 검증한다.
커스텀 큐 설계 패턴
기본 제공 큐로는 해결 어려운 시나리오를 위해 직접 구현할 수 있다.
public class TieredBlockingQueue<T> extends AbstractQueue<T>
implements BlockingQueue<T> {
private final BlockingQueue<T> urgentLane;
private final BlockingQueue<T> normalLane;
private final AtomicInteger urgentThreshold;
public TieredBlockingQueue(int urgentCap, int normalCap, int threshold) {
this.urgentLane = new ArrayBlockingQueue<>(urgentCap);
this.normalLane = new LinkedBlockingQueue<>(normalCap);
this.urgentThreshold = new AtomicInteger(threshold);
}
@Override
public boolean offer(T element) {
if (element instanceof Prioritized &&
((Prioritized) element).level() > urgentThreshold.get()) {
return urgentLane.offer(element);
}
return normalLane.offer(element);
}
@Override
public T poll() {
T item = urgentLane.poll();
return item != null ? item : normalLane.poll();
}
// size, peek, iterator 등 생략
}
거부 정책 커스텀 구현
표준 4가지 전략 외에 비즈니스 요구에 맞는 핸들러를 정의한다.
public class BackpressurePolicy implements RejectedExecutionHandler {
private final MeterRegistry metrics;
public BackpressurePolicy(MeterRegistry metrics) {
this.metrics = metrics;
}
@Override
public void rejected(Runnable r, ThreadPoolExecutor executor) {
metrics.counter("pool.rejection").increment();
if (r instanceof DeferredTask) {
// 일정 시간 후 재시도 큐로 이관
((DeferredTask) r).rescheduleAfter(Duration.ofMillis(100));
} else {
// 호출자에게 즉시 피드백
throw new PoolExhaustedException("임계점 도달, 요청 거부됨");
}
}
}
고부하 장비에서의 분산 처리
대용량 데이터 처리 시 단일 풀 병목을 하기 위한 샤딩 접근법.
public class ShardedExecutor {
private final ThreadPoolExecutor[] partitions;
private final AtomicLong sequencer = new AtomicLong();
public ShardedExecutor(int shardCount, int perShardThreads) {
partitions = new ThreadPoolExecutor[shardCount];
for (int i = 0; i < shardCount; i++) {
partitions[i] = new ThreadPoolExecutor(
perShardThreads, perShardThreads,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(256),
new ThreadFactoryBuilder().setNameFormat("shard-" + i + "-%d").build()
);
}
}
public Future<?> dispatch(Runnable task) {
long idx = sequencer.getAndIncrement() & (partitions.length - 1);
return partitions[(int) idx].submit(task);
}
public void shutdownGracefully() {
for (ThreadPoolExecutor pool : partitions) {
pool.shutdown();
}
// 생략: awaitTermination 처리
}
}
비동기 파이프라인과 조합
CompletableFuture를 활용한 다단계 비동기 체인 구성.
public class AsyncPipeline {
private final Executor fetchStage;
private final Executor computeStage;
private final Executor persistStage;
public CompletableFuture<Result> process(Request req) {
return CompletableFuture
.supplyAsync(() -> fetchFromRemote(req), fetchStage)
.thenApplyAsync(this::transform, computeStage)
.thenComposeAsync(data ->
CompletableFuture.supplyAsync(() -> saveToStore(data), persistStage)
.thenApply(v -> new Result(data, v))
);
}
private Data fetchFromRemote(Request r) { /* NIO 기반 논블로킹 IO */ return null; }
private Data transform(Data d) { /* CPU 집약 변환 */ return d; }
private boolean saveToStore(Data d) { /* DB 기록 */ return true; }
}
운영 환경 사례: 대규모 주문 처리
문제 상황: 트래픽 급증 시 Executors.newCachedThreadPool 사용으로 스레드 20,000개 생성, 시스템 교착.
개선 구성:
ThreadPoolExecutor orderProcessor = new ThreadPoolExecutor(
200, 400,
60L, TimeUnit.SECONDS,
new LinkedTransferQueue<>() {
@Override
public boolean offer(Runnable e) {
// 거부 없이 호출자 블로킹 유도
try {
transfer(e);
return true;
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return false;
}
}
},
new CustomThreadFactory("order-pool"),
new BackpressurePolicy(meterRegistry)
);
부가 조치:
- Redis 캐싱 계층 도입으로 DB 직접 접근 70% 감소
- Kafka 버퍼링으로 급격한 트래픽 스파이크 흡수
- Prometheus 기반 queueSize / activeCount / completedTasks 모니터링
결과: 피크 스레드 수 450개로 안정화, P99 지연 2.3초 → 180ms.
잠재적 위험 요소와 대응
| 증상 | 원인 | 해결책 |
|---|---|---|
| 메모리 부족 (OOM) | 무제한 큐에 작업 무한 누적 | ArrayBlockingQueue + 적절한 capacity |
| 스레드 기아 | 긴 작업壟占 공정 큐 | 별도 풀 분리 또는 우선순위 큐 |
| 교착 상태 | 작업 내부에서 외부 동기화 블록 호출 | 비동기 백 패턴으로 대체 |
| 리소스 누출 | 풀 미종료로 스레드 잔존 | ShutdownHook 또는 Spring DisposableBean |
| 디버깅 곤란 | 기본 스레드명 (pool-N-thread-M) | 의미 있는 ThreadFactory 구현 |
고급 최적화 기법
스핀 기반 대기: 짧은 임계구간에서 락 프리 구조로 전환.
public class SpinningBackoffQueue<T> extends LinkedTransferQueue<T> {
private static final int SPIN_TRIES = 100;
@Override
public T poll() {
T item;
for (int i = 0; i < SPIN_TRIES; i++) {
if ((item = super.poll()) != null) return item;
Thread.onSpinWait(); // Java 9+
}
return super.poll(); // 최종 fallback
}
}
CPU 친화도 바인딩: NUMA 환경에서 캐시 효율 극대화.
public void bindToCore(Thread t, int coreId) {
// Linux-specific: sched_setaffinity 시스템 콜
// Java 21 가상 스레드에서는 플랫폼 스레드 레벨에서 적용
ProcessHandle.current().info().command().ifPresent(cmd -> {
// jnr-ffi 또는 ProcessBuilder로 native 호출
});
}
워크스틸링 큐: ForkJoinPool의 기본 전략을 커스텀 풀에 적용.
public class WorkStealingPool {
private final ForkJoinPool adaptivePool = new ForkJoinPool(
Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true // asyncMode: FIFO for event-style tasks
);
public void submitBatch(List<Task> tasks) {
adaptivePool.invoke(new RecursiveAction() {
@Override
protected void compute() {
tasks.parallelStream().forEach(Task::execute);
}
});
}
}
관측 도구 및 방법론
| 도구 | 활용 포인트 |
|---|---|
| async-profiler | CPU 샘플링 + lock contention 동시 분석 |
| JFR (JDK Flight Recorder) | 스레드 블로킹 이벤트, park/parkNanos 상세 추적 |
| Micrometer + Prometheus | executor_pool_size, executor_queue_remaining 커스텀 메트릭 |
| JMH | 큐 구현체별 throughput/latency 벤치마크 |
| Arthas thread | 실시간 스레드 상태 분포, 블로킹 위치 식별 |
성능 개선은 단일 파라미터 조정이 아닌, 큐 전략-거부 정책-모니터링-비즈니스 아키텍처의 연속된 설계 과정에서 나온다. 실제 적용 시에는 점진적 배포와 지표 기반 검증을 병행할 것.