기존 Future 인터페이스의 한계
Java의 Future 인터페이스는 비동기 작업의 결과를 나타내고 관리하는 데 사용됩니다. FutureTask와 Callable을 결합하면 메인 스레드와 별도로 실행되는 백그라운드 작업을 생성할 수 있습니다. 그러나 기존 Future는 다음과 같은 명확한 한계를 가집니다.
1. get() 메서드의 블로킹 문제
get() 메서드를 호출하면 작업이 완료될 때까지 현재 스레드가 블로킹됩니다. 작업 완료 여부를 알 수 없는 상황에서 get()을 호출하면 메인 스레드의 자원이 낭비되며 전체 애플리케이션의 처리량이 떨어질 수 있습니다.
2. isDone() 폴링의 자원 소모
블로킹을 피하기 위해 isDone() 메서드를 반복적으로 호출하여 작업 완료 여부를 확인하는 폴링(Polling) 방식을 사용할 수 있습니다. 하지만 이 방식은 작업이 완료되지 않은 동안에도 CPU 사이클을 지속적으로 소모하여 시스템 전체의 성능을 저하시킵니다.
CompletableFuture의 등장과 핵심 개념
이러한 한계를 극복하기 위해 JDK 8에서는 CompletableFuture가 도입되었습니다. 이 클래스는 Future와 CompletionStage 인터페이스를 모두 구현하여, 블로킹이나 폴링 없이 콜백(Callback) 방식으로 비동기 작업의 완료 시점을 처리할 수 있게 해줍니다. 또한 함수형 프로그래밍 패러다임을 적용하여 여러 비동기 작업을 유연하게 조합하고 체이닝(Chaining)할 수 있는 강력한 API를 제공합니다.
비동기 작업 생성하기
CompletableFuture는 정적 팩토리 메서드를 통해 비동기 작업을 생성합니다. 반환 값의 유무와 사용자 정의 스레드 풀 사용 여부에 따라 네 가지 핵심 메서드로 나뉩니다. 명시적으로 스레드 풀을 지정하지 않으면 기본적으로 ForkJoinPool.commonPool()을 사용합니다.
반환 값이 없는 작업: runAsync
public class AsyncExecutionDemo {
public static void main(String[] args) throws Exception {
ExecutorService customPool = Executors.newFixedThreadPool(2);
// 기본 스레드 풀 사용
CompletableFuture<Void> defaultTask = CompletableFuture.runAsync(() -> {
System.out.println("Default Pool: " + Thread.currentThread().getName());
});
// 사용자 정의 스레드 풀 사용
CompletableFuture<Void> customTask = CompletableFuture.runAsync(() -> {
System.out.println("Custom Pool: " + Thread.currentThread().getName());
}, customPool);
defaultTask.get();
customTask.get();
customPool.shutdown();
}
}
반환 값이 있는 작업: supplyAsync
public class AsyncSupplyDemo {
public static void main(String[] args) throws Exception {
CompletableFuture<String> dataFetchTask = CompletableFuture.supplyAsync(() -> {
System.out.println("Fetching data on: " + Thread.currentThread().getName());
return "Payload-Data-123";
});
System.out.println("Result: " + dataFetchTask.get());
}
}
콜백을 통한 결과 및 예외 처리
whenComplete와 exceptionally 메서드를 사용하면 작업이 정상적으로 완료되었을 때와 예외가 발생했을 때의 로직을 분리하여 처리할 수 있습니다. 이를 통해 블로킹 없이 이벤트 기반의 후속 작업을 정의할 수 있습니다.
public class CallbackHandlingDemo {
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newSingleThreadExecutor();
CompletableFuture.supplyAsync(() -> {
int randomValue = ThreadLocalRandom.current().nextInt(1, 10);
if (randomValue > 7) {
throw new RuntimeException("Value exceeded threshold: " + randomValue);
}
return randomValue;
}, pool)
.whenComplete((result, throwable) -> {
if (throwable == null) {
System.out.println("Successfully processed value: " + result);
}
})
.exceptionally(throwable -> {
System.err.println("Error captured: " + throwable.getMessage());
return -1; // Fallback value
});
// 데몬 스레드 풀이 즉시 종료되는 것을 방지하기 위한 대기
TimeUnit.SECONDS.sleep(2);
pool.shutdown();
}
}
작업 체이닝과 결과 변환
여러 비동기 작업을 순차적으로 연결해야 할 때는 thenApply, handle, thenAccept, thenRun 등의 메서드를 사용합니다.
결과 변환 및 예외 복구: thenApply vs handle
thenApply는 이전 작업의 결과를 받아 새로운 결과로 변환하지만, 이전 단계에서 예외가 발생하면 체인이 중단됩니다. 반면 handle은 예외가 발생하더라도 이를 인자로 받아 후속 처리를 계속 진행할 수 있습니다.
public class ChainingDemo {
public static void main(String[] args) throws Exception {
CompletableFuture.supplyAsync(() -> 10)
.handle((value, err) -> {
if (err != null) return 0;
return value * 2; // 20
})
.thenApply(value -> {
System.out.println("Transformed value: " + value);
return value + 5; // 25
})
.thenAccept(finalValue -> System.out.println("Consumed final value: " + finalValue));
TimeUnit.SECONDS.sleep(1);
}
}
결과 소모와 독립 실행: thenAccept vs thenRun
thenAccept: 이전 작업의 결과를 소비하지만 반환 값은 없습니다.thenRun: 이전 작업의 결과와 상관없이 단순히 다음Runnable 작업을 실행합니다.
스레드 풀 실행 전략: Async 접미사의 의미
체인 메서드에는 thenRun과 thenRunAsync처럼 Async 접미사가 붙은 변형이 존재합니다. 이 두 방식의 스레드 할당 전략은 다음과 같이 다릅니다.
- Non-Async (예: thenRun): 이전 작업과 동일한 스레드 풀을 사용하거나, 작업이 매우 빠르게 완료된 경우 현재 스레드(메인 스레드)에서 즉시 실행될 수 있습니다.
- Async (예: thenRunAsync): 명시적으로 스레드 풀을 전달하지 않으면, 이전 작업의 스레드 풀과 무관하게 무조건
ForkJoinPool.commonPool()을 사용하여 새로운 스레드에서 작업을 실행합니다.
public class ThreadPoolStrategyDemo {
public static void main(String[] args) throws Exception {
ExecutorService ioPool = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() -> {
System.out.println("Task 1 (IO Pool): " + Thread.currentThread().getName());
return "Data";
}, ioPool)
.thenRun(() -> {
// 이전 작업과 동일한 ioPool을 사용하거나 호출 스레드에서 실행됨
System.out.println("Task 2 (Sync Chain): " + Thread.currentThread().getName());
})
.thenRunAsync(() -> {
// 강제로 ForkJoinPool.commonPool()을 사용함
System.out.println("Task 3 (Async Chain): " + Thread.currentThread().getName());
});
TimeUnit.SECONDS.sleep(1);
ioPool.shutdown();
}
}
다중 작업 조합: 병렬 처리와 결과 병합
가장 빠른 작업 선택: applyToEither
여러 중복 시스템(예: 다중 CDN 노드 또는 미러 데이터베이스)에 동일한 요청을 보내고, 가장 먼저 응답을 반환하는 작업의 결과만을 채택해야 할 때 유용합니다.
public class FastestResponseDemo {
public static void main(String[] args) throws Exception {
CompletableFuture<String> nodeA = CompletableFuture.supplyAsync(() -> {
TimeUnit.MILLISECONDS.sleep(300);
return "Response from Node A";
});
CompletableFuture<String> nodeB = CompletableFuture.supplyAsync(() -> {
TimeUnit.MILLISECONDS.sleep(100);
return "Response from Node B";
});
String fastest = nodeA.applyToEither(nodeB, response -> "Winner: " + response).join();
System.out.println(fastest); // Winner: Response from Node B
}
}
독립적인 작업 결과 병합: thenCombine
서로 의존성이 없는 두 비동기 작업이 모두 완료된 후, 각자의 결과를 조합하여 최종 결과를 생성할 때 사용합니다.
public class CombineResultsDemo {
public static void main(String[] args) throws Exception {
CompletableFuture<String> userProfile = CompletableFuture.supplyAsync(() -> "User: Alice");
CompletableFuture<Integer> userPoints = CompletableFuture.supplyAsync(() -> 1500);
CompletableFuture<String> summary = userProfile.thenCombine(userPoints, (profile, points) ->
profile + " | Points: " + points
);
System.out.println(summary.join()); // User: Alice | Points: 1500
}
}
실전 아키텍처: 다중 외부 API 재고 조회 최적화
분산 환경에서 여러 외부 저장소(예: 지역별 창고)의 재고 상태를 동시에 조회하여 취합해야 하는 시나리오를 가정해 보겠습니다. 동기(Synchronous) 방식과 CompletableFuture를 활용한 비동기(Asynchronous) 방식의 성능 차이를 비교합니다.
public class InventoryAggregationDemo {
static class Warehouse {
private final String region;
public Warehouse(String region) { this.region = region; }
public int checkStock(String itemId) {
// 외부 API 호출 또는 DB 조회 지연 시간 시뮬레이션
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
return ThreadLocalRandom.current().nextInt(10, 100);
}
}
static List<Warehouse> warehouses = Arrays.asList(
new Warehouse("US-East"),
new Warehouse("EU-West"),
new Warehouse("APAC-South")
);
// 동기 방식: 순차적 조회
public static List<String> fetchSequentially(String itemId) {
return warehouses.stream()
.map(w -> String.format("Region: %s, Stock: %d", w.region, w.checkStock(itemId)))
.collect(Collectors.toList());
}
// 비동기 방식: 병렬 조회
public static List<String> fetchAsynchronously(String itemId) {
List<CompletableFuture<String>> futures = warehouses.stream()
.map(w -> CompletableFuture.supplyAsync(() ->
String.format("Region: %s, Stock: %d", w.region, w.checkStock(itemId))
))
.collect(Collectors.toList());
return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
fetchSequentially("ITEM-99").forEach(System.out::println);
System.out.println("Sequential Time: " + (System.currentTimeMillis() - start) + "ms");
start = System.currentTimeMillis();
fetchAsynchronously("ITEM-99").forEach(System.out::println);
System.out.println("Asynchronous Time: " + (System.currentTimeMillis() - start) + "ms");
}
}
동기 방식은 각 창고의 조회 지연 시간이 누적되어 총 3초 이상의 응답 시간이 소요되지만, 비동기 방식은 모든 창고에 동시에 요청을 보내므로 단일 창고의 최대 지연 시간인 약 1초 내외에 모든 결과를 취합할 수 있습니다. 이는 I/O 바운드(I/O-bound) 작업이 많은 현대의 MSA(Microservices Architecture) 환경에서 시스템 처리량을 극대화하는 핵심 패턴입니다.