Java CompletableFuture를 활용한 효율적인 비동기 프로그래밍 및 작업 조합

기존 Future 인터페이스의 한계

Java의 Future 인터페이스는 비동기 작업의 결과를 나타내고 관리하는 데 사용됩니다. FutureTaskCallable을 결합하면 메인 스레드와 별도로 실행되는 백그라운드 작업을 생성할 수 있습니다. 그러나 기존 Future는 다음과 같은 명확한 한계를 가집니다.

1. get() 메서드의 블로킹 문제

get() 메서드를 호출하면 작업이 완료될 때까지 현재 스레드가 블로킹됩니다. 작업 완료 여부를 알 수 없는 상황에서 get()을 호출하면 메인 스레드의 자원이 낭비되며 전체 애플리케이션의 처리량이 떨어질 수 있습니다.

2. isDone() 폴링의 자원 소모

블로킹을 피하기 위해 isDone() 메서드를 반복적으로 호출하여 작업 완료 여부를 확인하는 폴링(Polling) 방식을 사용할 수 있습니다. 하지만 이 방식은 작업이 완료되지 않은 동안에도 CPU 사이클을 지속적으로 소모하여 시스템 전체의 성능을 저하시킵니다.

CompletableFuture의 등장과 핵심 개념

이러한 한계를 극복하기 위해 JDK 8에서는 CompletableFuture가 도입되었습니다. 이 클래스는 FutureCompletionStage 인터페이스를 모두 구현하여, 블로킹이나 폴링 없이 콜백(Callback) 방식으로 비동기 작업의 완료 시점을 처리할 수 있게 해줍니다. 또한 함수형 프로그래밍 패러다임을 적용하여 여러 비동기 작업을 유연하게 조합하고 체이닝(Chaining)할 수 있는 강력한 API를 제공합니다.

비동기 작업 생성하기

CompletableFuture는 정적 팩토리 메서드를 통해 비동기 작업을 생성합니다. 반환 값의 유무와 사용자 정의 스레드 풀 사용 여부에 따라 네 가지 핵심 메서드로 나뉩니다. 명시적으로 스레드 풀을 지정하지 않으면 기본적으로 ForkJoinPool.commonPool()을 사용합니다.

반환 값이 없는 작업: runAsync

public class AsyncExecutionDemo {
    public static void main(String[] args) throws Exception {
        ExecutorService customPool = Executors.newFixedThreadPool(2);

        // 기본 스레드 풀 사용
        CompletableFuture<Void> defaultTask = CompletableFuture.runAsync(() -> {
            System.out.println("Default Pool: " + Thread.currentThread().getName());
        });

        // 사용자 정의 스레드 풀 사용
        CompletableFuture<Void> customTask = CompletableFuture.runAsync(() -> {
            System.out.println("Custom Pool: " + Thread.currentThread().getName());
        }, customPool);

        defaultTask.get();
        customTask.get();
        customPool.shutdown();
    }
}

반환 값이 있는 작업: supplyAsync

public class AsyncSupplyDemo {
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> dataFetchTask = CompletableFuture.supplyAsync(() -> {
            System.out.println("Fetching data on: " + Thread.currentThread().getName());
            return "Payload-Data-123";
        });

        System.out.println("Result: " + dataFetchTask.get());
    }
}

콜백을 통한 결과 및 예외 처리

whenCompleteexceptionally 메서드를 사용하면 작업이 정상적으로 완료되었을 때와 예외가 발생했을 때의 로직을 분리하여 처리할 수 있습니다. 이를 통해 블로킹 없이 이벤트 기반의 후속 작업을 정의할 수 있습니다.

public class CallbackHandlingDemo {
    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newSingleThreadExecutor();

        CompletableFuture.supplyAsync(() -> {
            int randomValue = ThreadLocalRandom.current().nextInt(1, 10);
            if (randomValue > 7) {
                throw new RuntimeException("Value exceeded threshold: " + randomValue);
            }
            return randomValue;
        }, pool)
        .whenComplete((result, throwable) -> {
            if (throwable == null) {
                System.out.println("Successfully processed value: " + result);
            }
        })
        .exceptionally(throwable -> {
            System.err.println("Error captured: " + throwable.getMessage());
            return -1; // Fallback value
        });

        // 데몬 스레드 풀이 즉시 종료되는 것을 방지하기 위한 대기
        TimeUnit.SECONDS.sleep(2);
        pool.shutdown();
    }
}

작업 체이닝과 결과 변환

여러 비동기 작업을 순차적으로 연결해야 할 때는 thenApply, handle, thenAccept, thenRun 등의 메서드를 사용합니다.

결과 변환 및 예외 복구: thenApply vs handle

thenApply는 이전 작업의 결과를 받아 새로운 결과로 변환하지만, 이전 단계에서 예외가 발생하면 체인이 중단됩니다. 반면 handle은 예외가 발생하더라도 이를 인자로 받아 후속 처리를 계속 진행할 수 있습니다.

public class ChainingDemo {
    public static void main(String[] args) throws Exception {
        CompletableFuture.supplyAsync(() -> 10)
            .handle((value, err) -> {
                if (err != null) return 0;
                return value * 2; // 20
            })
            .thenApply(value -> {
                System.out.println("Transformed value: " + value);
                return value + 5; // 25
            })
            .thenAccept(finalValue -> System.out.println("Consumed final value: " + finalValue));
        
        TimeUnit.SECONDS.sleep(1);
    }
}

결과 소모와 독립 실행: thenAccept vs thenRun

  • thenAccept: 이전 작업의 결과를 소비하지만 반환 값은 없습니다.
  • thenRun: 이전 작업의 결과와 상관없이 단순히 다음Runnable 작업을 실행합니다.

스레드 풀 실행 전략: Async 접미사의 의미

체인 메서드에는 thenRunthenRunAsync처럼 Async 접미사가 붙은 변형이 존재합니다. 이 두 방식의 스레드 할당 전략은 다음과 같이 다릅니다.

  • Non-Async (예: thenRun): 이전 작업과 동일한 스레드 풀을 사용하거나, 작업이 매우 빠르게 완료된 경우 현재 스레드(메인 스레드)에서 즉시 실행될 수 있습니다.
  • Async (예: thenRunAsync): 명시적으로 스레드 풀을 전달하지 않으면, 이전 작업의 스레드 풀과 무관하게 무조건 ForkJoinPool.commonPool()을 사용하여 새로운 스레드에서 작업을 실행합니다.
public class ThreadPoolStrategyDemo {
    public static void main(String[] args) throws Exception {
        ExecutorService ioPool = Executors.newFixedThreadPool(3);

        CompletableFuture.supplyAsync(() -> {
            System.out.println("Task 1 (IO Pool): " + Thread.currentThread().getName());
            return "Data";
        }, ioPool)
        .thenRun(() -> {
            // 이전 작업과 동일한 ioPool을 사용하거나 호출 스레드에서 실행됨
            System.out.println("Task 2 (Sync Chain): " + Thread.currentThread().getName());
        })
        .thenRunAsync(() -> {
            // 강제로 ForkJoinPool.commonPool()을 사용함
            System.out.println("Task 3 (Async Chain): " + Thread.currentThread().getName());
        });

        TimeUnit.SECONDS.sleep(1);
        ioPool.shutdown();
    }
}

다중 작업 조합: 병렬 처리와 결과 병합

가장 빠른 작업 선택: applyToEither

여러 중복 시스템(예: 다중 CDN 노드 또는 미러 데이터베이스)에 동일한 요청을 보내고, 가장 먼저 응답을 반환하는 작업의 결과만을 채택해야 할 때 유용합니다.

public class FastestResponseDemo {
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> nodeA = CompletableFuture.supplyAsync(() -> {
            TimeUnit.MILLISECONDS.sleep(300);
            return "Response from Node A";
        });

        CompletableFuture<String> nodeB = CompletableFuture.supplyAsync(() -> {
            TimeUnit.MILLISECONDS.sleep(100);
            return "Response from Node B";
        });

        String fastest = nodeA.applyToEither(nodeB, response -> "Winner: " + response).join();
        System.out.println(fastest); // Winner: Response from Node B
    }
}

독립적인 작업 결과 병합: thenCombine

서로 의존성이 없는 두 비동기 작업이 모두 완료된 후, 각자의 결과를 조합하여 최종 결과를 생성할 때 사용합니다.

public class CombineResultsDemo {
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> userProfile = CompletableFuture.supplyAsync(() -> "User: Alice");
        CompletableFuture<Integer> userPoints = CompletableFuture.supplyAsync(() -> 1500);

        CompletableFuture<String> summary = userProfile.thenCombine(userPoints, (profile, points) -> 
            profile + " | Points: " + points
        );

        System.out.println(summary.join()); // User: Alice | Points: 1500
    }
}

실전 아키텍처: 다중 외부 API 재고 조회 최적화

분산 환경에서 여러 외부 저장소(예: 지역별 창고)의 재고 상태를 동시에 조회하여 취합해야 하는 시나리오를 가정해 보겠습니다. 동기(Synchronous) 방식과 CompletableFuture를 활용한 비동기(Asynchronous) 방식의 성능 차이를 비교합니다.

public class InventoryAggregationDemo {
    
    static class Warehouse {
        private final String region;
        public Warehouse(String region) { this.region = region; }
        
        public int checkStock(String itemId) {
            // 외부 API 호출 또는 DB 조회 지연 시간 시뮬레이션
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
            return ThreadLocalRandom.current().nextInt(10, 100);
        }
    }

    static List<Warehouse> warehouses = Arrays.asList(
        new Warehouse("US-East"),
        new Warehouse("EU-West"),
        new Warehouse("APAC-South")
    );

    // 동기 방식: 순차적 조회
    public static List<String> fetchSequentially(String itemId) {
        return warehouses.stream()
            .map(w -> String.format("Region: %s, Stock: %d", w.region, w.checkStock(itemId)))
            .collect(Collectors.toList());
    }

    // 비동기 방식: 병렬 조회
    public static List<String> fetchAsynchronously(String itemId) {
        List<CompletableFuture<String>> futures = warehouses.stream()
            .map(w -> CompletableFuture.supplyAsync(() -> 
                String.format("Region: %s, Stock: %d", w.region, w.checkStock(itemId))
            ))
            .collect(Collectors.toList());

        return futures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
    }

    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        fetchSequentially("ITEM-99").forEach(System.out::println);
        System.out.println("Sequential Time: " + (System.currentTimeMillis() - start) + "ms");

        start = System.currentTimeMillis();
        fetchAsynchronously("ITEM-99").forEach(System.out::println);
        System.out.println("Asynchronous Time: " + (System.currentTimeMillis() - start) + "ms");
    }
}

동기 방식은 각 창고의 조회 지연 시간이 누적되어 총 3초 이상의 응답 시간이 소요되지만, 비동기 방식은 모든 창고에 동시에 요청을 보내므로 단일 창고의 최대 지연 시간인 약 1초 내외에 모든 결과를 취합할 수 있습니다. 이는 I/O 바운드(I/O-bound) 작업이 많은 현대의 MSA(Microservices Architecture) 환경에서 시스템 처리량을 극대화하는 핵심 패턴입니다.

태그: java CompletableFuture concurrency AsynchronousProgramming threadpool

6월 8일 01:05에 게시됨