병렬 세계 속의 자바 프로그래밍
현실 세계는 본질적으로 동시적이다. 다양한 사건이 동시에 발생하며, 이 특성을 반영하여 소프트웨어도 병렬로 동작하도록 설계된다. JVM 위에서 실행되는 자바 코드는 이러한 병렬성을 효과적으로 활용할 수 있다. 반면, 일부 언어는 GIL(Global Interpreter Lock)과 같은 제약으로 인해 진정한 병렬성을 달성하기 어렵다.
하지만 인간의 뇌는 복잡한 동시성 로직을 직관적으로 파악하는 데 한계가 있다. 공유 자원에 대한 경쟁 조건, 스레드 간 통신, 예외 상황 처리 등은 오류를 유발하기 쉬운 영역이다. 이를 해결하기 위해 개발자들은 다양한 동시성 모델을 고안했다. 이 글에서는 동일한 문제를 네 가지 서로 다른 패러다임으로 해결하며 각각의 장단점을 살펴본다.
- 직접 관리하는 스레드 (Raw Threads)
- 실행기 서비스 (ExecutorService)
- 포크조인 프레임워크와 병렬 스트림
- 액터 모델 (Actor Model)
공통 작업 정의
여러 검색 엔진 URL 목록과 검색어를 입력받아, 각 엔진에 HTTP 요청을 보내고 가장 먼저 도착한 응답 하나를 반환하는 메서드를 구현한다. 응답 대기 중 무한 루프에 빠지지 않도록 적절한 종료 조건이 필요하다.
방법 1: 순수 스레드 사용
가장 기본적인 방법으로, Thread 객체를 직접 생성하고 시작한다. 각 검색 엔진마다 독립된 스레드를 할당하여 비동기 요청을 보낸다. 결과는 스레드 안전한 AtomicReference에 저장되며, 첫 번째로 기록된 값만 유지된다.
private static String fetchFirstResponse(String query, List<String> endpoints) {
AtomicReference<String> response = new AtomicReference<>();
for (String endpoint : endpoints) {
String requestUrl = endpoint + query;
new Thread(() -> {
try {
String result = HttpUtil.fetch(requestUrl);
response.compareAndSet(null, result);
} catch (Exception e) {
// 실패 시 무시
}
}).start();
}
while (response.get() == null) {
// 첫 결과를 기다림
}
return response.get();
}
이 방식은 개념적으로 단순하며 하드웨어와 가까운 추상화를 제공한다. 그러나 스레드는 리소스를 많이 소비하므로 지나치게 많은 스레드를 생성하면 성능 저하 및 메모리 부족 문제가 발생할 수 있다. 또한 스레드 생명주기를 직접 관리해야 하며, 종료 신호 전달 등의 로직을 수동으로 구현해야 한다.
방법 2: ExecutorService와 CompletionService 활용
스레드 풀을 통해 효율적인 자원 관리를 가능하게 하는 ExecutorService를 사용한다. 특히 ExecutorCompletionService는 완료된 작업을 큐에서 가져올 수 있도록 하여, 가장 먼저 끝난 작업의 결과를 즉시 반환할 수 있게 해준다.
private static String fetchWithExecutor(String query, List<String> endpoints) {
ExecutorService pool = Executors.newFixedThreadPool(4);
ExecutorCompletionService<String> completion = new ExecutorCompletionService<>(pool);
for (String endpoint : endpoints) {
String url = endpoint + query;
completion.submit(() -> HttpUtil.fetch(url));
}
try {
Future<String> firstResult = completion.take(); // 첫 완료 작업 대기
return firstResult.get();
} catch (InterruptedException | ExecutionException e) {
return null;
} finally {
pool.shutdown();
}
}
이 방식은 스레드 수를 제어할 수 있어 안정성이 높으며, 작업 큐를 통해 과부하를 완화할 수 있다. 하지만 스레드 풀 크기, 큐 용량, 거부 정책 등을 신중히 결정해야 하며, 잘못된 설정은 성능 저하로 이어질 수 있다.
방법 3: 포크조인 및 병렬 스트림
자바 8부터 도입된 병렬 스트림은 내부적으로 ForkJoinPool을 사용하여 요소들을 분할하고 병렬로 처리한다. 함수형 스타일로 코드를 작성할 수 있으며, 문법이 간결하다.
private static String fetchParallelStream(String query, List<String> endpoints) {
Optional<String> result = endpoints.parallelStream()
.map(endpoint -> endpoint + query)
.map(HttpUtil::fetch)
.findAny();
return result.orElse(null);
}
이 방식은 구현이 매우 간단하지만, 몇 가지 함정이 있다. 우선 모든 작업이 ForkJoinPool.commonPool()을 공유하므로, 다른 라이브러리나 컴포넌트의 장시간 작업이 전체 애플리케이션의 성능을 저하시킬 수 있다. 또한 실제 병렬화 여부는 데이터 소스의 특성에 따라 달라질 수 있으므로 예측하기 어렵다.
방법 4: 액터 모델 (Akka 기반)
액터는 상태와 행동을 캡슐화한 독립된 계산 단위로, 메시지를 통해 통신한다. JDK에는 내장되어 있지 않지만, Akka와 같은 외부 라이브러리를 사용하면 구현할 수 있다. 액터는 공유 상태를 피하고 메시지 기반 비동기 통신을 통해 견고한 시스템을 구성할 수 있다.
public static class FetchTask {
public final String url;
public FetchTask(String url) { this.url = url; }
}
public static class Response {
public final String content;
public Response(String content) { this.content = content; }
}
public static class Worker extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(FetchTask.class, task -> {
String data = HttpUtil.fetch(task.url);
getSender().tell(new Response(data), getSelf());
})
.build();
}
}
public static class Coordinator extends AbstractActor {
private final String query;
private final List<String> engines;
private final AtomicReference<String> result;
public Coordinator(String query, List<String> engines, AtomicReference<String> result) {
this.query = query;
this.engines = engines;
this.result = result;
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Response.class, res -> {
result.compareAndSet(null, res.content);
getContext().stop(getSelf());
})
.matchEquals("start", msg -> {
for (String engine : engines) {
String url = engine + query;
ActorRef worker = getContext().actorOf(Props.create(Worker.class));
worker.tell(new FetchTask(url), getSelf());
}
})
.build();
}
}
private static String fetchUsingActors(String query, List<String> engines) {
ActorSystem system = ActorSystem.create("SearchSystem");
AtomicReference<String> result = new AtomicReference<>();
ActorRef master = system.actorOf(
Props.create(() -> new Coordinator(query, engines, result)), "coordinator");
master.tell("start", ActorRef.noSender());
while (result.get() == null) {
// 첫 응답 대기
}
system.terminate();
return result.get();
}
Akka는 내부적으로 ForkJoinPool을 사용하여 효율적인 스레드 활용을 제공하며, 감독 전략을 통해 장애 복구를 쉽게 구현할 수 있다. 다만, 메시지 기반 설계는 새로운 사고 방식을 요구하며, 초기 도입 비용이 높을 수 있다.