SpringBoot 비동기 처리와 세마포어를 활용한 동시성 제어

세마포어(Semaphore)는 다중 스레드 환경에서 공유 자원에 대한 동시 접근을 제어하는 동기화 메커니즘입니다. 허용된 개수만큼의 스레드만 자원에 접근할 수 있도록 관리하며, 초과하는 요청은 대기 상태로 전환됩니다.

실무에서는 외부 API 호출, 데이터베이스 연결, 파일 I/O 등 리소스 집약적인 작업에 동시 접근 제한을 적용하여 시스템 안정성을 확보합니다. 예를 들어 특정 서비스 메서드에 최대 5개의 스레드만 동시 실행되도록 제약을 설정할 수 있습니다.

시나리오 설계

다음과 같은 요구사항을 구현합니다:

  • SharedWorker: 공유 자원으로 30초간 처리되는 작업 수행
  • SyncInvoker: 동기 방식으로 SharedWorker 호출
  • AsyncInvoker: 비동기 방식으로 SharedWorker 호출, 최대 10개 스레드 병렬 실행

핵심 제약조건: SharedWorker에 대한 전역 동시 호출 수는 10개로 엄격히 제한되며, 이는 동기/비동기 호출 모두에 적용됩니다.

스레드 풀 및 세마포어 구성

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;

@Configuration
@EnableAsync
public class ConcurrencyConfig {

    @Bean(name = "boundedExecutor")
    public Executor boundedExecutor() {
        ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
        pool.setCorePoolSize(15);
        pool.setMaxPoolSize(40);
        pool.setQueueCapacity(200);
        pool.setKeepAliveSeconds(120);
        pool.setThreadNamePrefix("Worker-");
        pool.setRejectedExecutionHandler(
            new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy()
        );
        pool.initialize();
        return pool;
    }

    @Bean
    public Semaphore globalThrottle() {
        return new Semaphore(10, true); // 공정성 보장, 최대 10개 동시 실행
    }
}

공유 자원 서비스 구현

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

@Component
public class SharedWorker {
    private static final Logger log = LoggerFactory.getLogger(SharedWorker.class);

    @Autowired
    private Semaphore globalThrottle;

    public void executeHeavyTask(String requestId) {
        boolean permitAcquired = false;
        try {
            log.info("[{}] 퍼미트 대기 중...", requestId);
            globalThrottle.acquire();
            permitAcquired = true;
            
            log.info("[{}] 작업 시작", requestId);
            TimeUnit.SECONDS.sleep(30);
            log.info("[{}] 작업 완료", requestId);
            
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException("작업 중 인터럽트 발생: " + requestId, e);
        } finally {
            if (permitAcquired) {
                globalThrottle.release();
                log.info("[{}] 퍼미트 반환", requestId);
            }
        }
    }
}

동기 호출 서비스

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class SyncInvoker {
    private static final Logger log = LoggerFactory.getLogger(SyncInvoker.class);

    @Autowired
    private SharedWorker worker;

    public void invokeDirectly(String payload) {
        log.info("동기 호출 시작: {}", payload);
        worker.executeHeavyTask("SYNC-" + payload);
        log.info("동기 호출 종료: {}", payload);
    }
}

비동기 호출 서비스

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
public class AsyncInvoker {
    private static final Logger log = LoggerFactory.getLogger(AsyncInvoker.class);

    @Autowired
    private SharedWorker worker;

    @Async("boundedExecutor")
    public void invokeInBackground(String payload) {
        log.info("비동기 호출 시작: {}", payload);
        worker.executeHeavyTask("ASYNC-" + payload);
        log.info("비동기 호출 종료: {}", payload);
    }
}

테스트용 REST 컨트롤러

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.stream.IntStream;

@RestController
@RequestMapping("/concurrency-test")
public class LoadTestController {
    private static final Logger log = LoggerFactory.getLogger(LoadTestController.class);

    @Autowired
    private SyncInvoker syncInvoker;
    
    @Autowired
    private AsyncInvoker asyncInvoker;

    @PostMapping("/sync-single")
    public String triggerSync(@RequestParam String data) {
        syncInvoker.invokeDirectly(data);
        return "동기 요청 처리 완료";
    }

    @PostMapping("/async-bulk")
    public String triggerAsyncBatch(@RequestParam(defaultValue = "20") int count) {
        IntStream.range(0, count).forEach(i -> 
            asyncInvoker.invokeInBackground("Batch-" + i)
        );
        return count + "개 비동기 요청 발행 완료";
    }

    @PostMapping("/mixed-load")
    public String triggerMixedLoad() {
        // 동기 5개 + 비동기 15개 = 총 20개 요청, 세마포어 10개 제한 적용
        IntStream.range(0, 5).forEach(i -> 
            new Thread(() -> syncInvoker.invokeDirectly("Mixed-" + i)).start()
        );
        
        IntStream.range(5, 20).forEach(i -> 
            asyncInvoker.invokeInBackground("Mixed-" + i)
        );
        
        return "혼합 부하 테스트 시작";
    }
}

동시성 제어 원리

세마포어 기반 제한이 적용되는 방식:

호출 유형스레드 풀 영향세마포어 영향
SyncInvoker호출자 스레드 직접 사용10개 중 1개 퍼미트 획득
AsyncInvokerboundedExecutor 스레드 할당10개 중 1개 퍼미트 획득

비동기 실행 시 스레드 풀의 가용 스레드 수(15개)와 세마포어 허용량(10개) 중 작은 값인 10개가 실제 병렬 처리 한계가 됩니다. 세마포어 퍼미트가 모두 점유된 상태에서 추가 요청이 발생하면, 해당 스레드는 acquire()에서 블로킹됩니다.

세마포어 vs 스레드 풀 크기 설정

효과적인 동시성 제어를 위한 권장 구성:

  • 세마포어 허용량 ≤ 스레드 풀 최대 크기: 세마포어가 병목 지점
  • 세마포어 허용량 > 스레드 풀 최대 크기: 스레드 풀이 병목 지점
  • 본 예제: 세마포어(10) < 풀 코어(15) < 풀 최대(40) 구조로 세마포어 중심 제어

세마포어 획득 타임아웃이 필요한 경우 tryAcquire(long timeout, TimeUnit unit)를 활용하여 데드락 위험을 방지할 수 있습니다.

태그: Spring Boot @Async Semaphore ThreadPoolTaskExecutor Concurrency Control

5월 29일 08:32에 게시됨