세마포어(Semaphore)는 다중 스레드 환경에서 공유 자원에 대한 동시 접근을 제어하는 동기화 메커니즘입니다. 허용된 개수만큼의 스레드만 자원에 접근할 수 있도록 관리하며, 초과하는 요청은 대기 상태로 전환됩니다.
실무에서는 외부 API 호출, 데이터베이스 연결, 파일 I/O 등 리소스 집약적인 작업에 동시 접근 제한을 적용하여 시스템 안정성을 확보합니다. 예를 들어 특정 서비스 메서드에 최대 5개의 스레드만 동시 실행되도록 제약을 설정할 수 있습니다.
시나리오 설계
다음과 같은 요구사항을 구현합니다:
- SharedWorker: 공유 자원으로 30초간 처리되는 작업 수행
- SyncInvoker: 동기 방식으로 SharedWorker 호출
- AsyncInvoker: 비동기 방식으로 SharedWorker 호출, 최대 10개 스레드 병렬 실행
핵심 제약조건: SharedWorker에 대한 전역 동시 호출 수는 10개로 엄격히 제한되며, 이는 동기/비동기 호출 모두에 적용됩니다.
스레드 풀 및 세마포어 구성
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
@Configuration
@EnableAsync
public class ConcurrencyConfig {
@Bean(name = "boundedExecutor")
public Executor boundedExecutor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(15);
pool.setMaxPoolSize(40);
pool.setQueueCapacity(200);
pool.setKeepAliveSeconds(120);
pool.setThreadNamePrefix("Worker-");
pool.setRejectedExecutionHandler(
new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy()
);
pool.initialize();
return pool;
}
@Bean
public Semaphore globalThrottle() {
return new Semaphore(10, true); // 공정성 보장, 최대 10개 동시 실행
}
}
공유 자원 서비스 구현
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@Component
public class SharedWorker {
private static final Logger log = LoggerFactory.getLogger(SharedWorker.class);
@Autowired
private Semaphore globalThrottle;
public void executeHeavyTask(String requestId) {
boolean permitAcquired = false;
try {
log.info("[{}] 퍼미트 대기 중...", requestId);
globalThrottle.acquire();
permitAcquired = true;
log.info("[{}] 작업 시작", requestId);
TimeUnit.SECONDS.sleep(30);
log.info("[{}] 작업 완료", requestId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("작업 중 인터럽트 발생: " + requestId, e);
} finally {
if (permitAcquired) {
globalThrottle.release();
log.info("[{}] 퍼미트 반환", requestId);
}
}
}
}
동기 호출 서비스
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class SyncInvoker {
private static final Logger log = LoggerFactory.getLogger(SyncInvoker.class);
@Autowired
private SharedWorker worker;
public void invokeDirectly(String payload) {
log.info("동기 호출 시작: {}", payload);
worker.executeHeavyTask("SYNC-" + payload);
log.info("동기 호출 종료: {}", payload);
}
}
비동기 호출 서비스
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Service
public class AsyncInvoker {
private static final Logger log = LoggerFactory.getLogger(AsyncInvoker.class);
@Autowired
private SharedWorker worker;
@Async("boundedExecutor")
public void invokeInBackground(String payload) {
log.info("비동기 호출 시작: {}", payload);
worker.executeHeavyTask("ASYNC-" + payload);
log.info("비동기 호출 종료: {}", payload);
}
}
테스트용 REST 컨트롤러
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.stream.IntStream;
@RestController
@RequestMapping("/concurrency-test")
public class LoadTestController {
private static final Logger log = LoggerFactory.getLogger(LoadTestController.class);
@Autowired
private SyncInvoker syncInvoker;
@Autowired
private AsyncInvoker asyncInvoker;
@PostMapping("/sync-single")
public String triggerSync(@RequestParam String data) {
syncInvoker.invokeDirectly(data);
return "동기 요청 처리 완료";
}
@PostMapping("/async-bulk")
public String triggerAsyncBatch(@RequestParam(defaultValue = "20") int count) {
IntStream.range(0, count).forEach(i ->
asyncInvoker.invokeInBackground("Batch-" + i)
);
return count + "개 비동기 요청 발행 완료";
}
@PostMapping("/mixed-load")
public String triggerMixedLoad() {
// 동기 5개 + 비동기 15개 = 총 20개 요청, 세마포어 10개 제한 적용
IntStream.range(0, 5).forEach(i ->
new Thread(() -> syncInvoker.invokeDirectly("Mixed-" + i)).start()
);
IntStream.range(5, 20).forEach(i ->
asyncInvoker.invokeInBackground("Mixed-" + i)
);
return "혼합 부하 테스트 시작";
}
}
동시성 제어 원리
세마포어 기반 제한이 적용되는 방식:
| 호출 유형 | 스레드 풀 영향 | 세마포어 영향 |
|---|---|---|
| SyncInvoker | 호출자 스레드 직접 사용 | 10개 중 1개 퍼미트 획득 |
| AsyncInvoker | boundedExecutor 스레드 할당 | 10개 중 1개 퍼미트 획득 |
비동기 실행 시 스레드 풀의 가용 스레드 수(15개)와 세마포어 허용량(10개) 중 작은 값인 10개가 실제 병렬 처리 한계가 됩니다. 세마포어 퍼미트가 모두 점유된 상태에서 추가 요청이 발생하면, 해당 스레드는 acquire()에서 블로킹됩니다.
세마포어 vs 스레드 풀 크기 설정
효과적인 동시성 제어를 위한 권장 구성:
- 세마포어 허용량 ≤ 스레드 풀 최대 크기: 세마포어가 병목 지점
- 세마포어 허용량 > 스레드 풀 최대 크기: 스레드 풀이 병목 지점
- 본 예제: 세마포어(10) < 풀 코어(15) < 풀 최대(40) 구조로 세마포어 중심 제어
세마포어 획득 타임아웃이 필요한 경우 tryAcquire(long timeout, TimeUnit unit)를 활용하여 데드락 위험을 방지할 수 있습니다.