Java 동시성 제어: CountDownLatch와 CyclicBarrier 심층 분석

Java 동시성 프로그래밍에서 스레드 간 실행 순서나 특정 지점에서의 동기화를 제어하는 것은 매우 중요합니다. CountDownLatchCyclicBarrier는 이러한 목적으로 사용되는 두 가지 대표적인 동기화 도구입니다. 이 글에서는 두 도구의 개념, 차이점, 실제 사용 예제, 그리고 내부 동작 원리를 소스 코드 수준에서 살펴보겠습니다.

1. CountDownLatch (Latch)

CountDownLatch는 하나 이상의 스레드가 다른 스레드에서 수행되는 일련의 작업이 완료될 때까지 기다릴 수 있게 해주는 동기화 보조 도구입니다. 초기화 시 양의 정수 카운터를 설정합니다. await() 메서드는 현재 카운터 값이 0이 될 때까지 현재 스레드를 대기 상태로 만듭니다. countDown() 메서드는 카운터를 감소시킵니다. 한번 카운터가 0에 도달하면 모든 대기 중인 스레드가 해제되며, 이후의 await() 호출은 즉시 반환됩니다. 이는 일회성 이벤트입니다.

적용 사례

  • 애플리케이션 시작 시 모든 필수 서비스나 리소스가 초기화될 때까지 메인 스레드 대기.
  • 대규모 계산 작업을 여러 서브 작업으로 분할하고, 모든 서브 작업이 완료될 때까지 기다린 후 결과를 집계.
  • 온라인 게임에서 모든 플레이어가 "준비" 상태가 될 때까지 게임 시작을 지연.

2. CyclicBarrier (Barrier)

CyclicBarrier는 여러 스레드가 서로를 기다리며 특정 지점(배리어)에서 만날 수 있도록 해주는 동기화 도구입니다. 배리어는 모든 참여 스레드가 await()을 호출할 때까지 대기하며, 마지막 스레드가 도착하면 배리어가 열리고 모든 대기 스레드가 해제됩니다. CountDownLatch와의 핵심 차이점은 재사용성입니다. CyclicBarrier는 리셋되어 다시 사용할 수 있습니다(cyclic). 선택적으로 배리어가 열릴 때 실행될 Runnable 작업(배리어 액션)을 설정할 수 있습니다.

적용 사례

  • 병렬 알고리즘에서 각 스레드가 자신의 작업 단계를 마친 후, 다음 단계로 넘어가기 전에 모든 동료 스레드가 도착할 때까지 기다리는 경우.
  • 분산 시스템에서 여러 노드가 특정 상태에 도달했는지 확인하는 동기화 지점.

3. 실전 예제: CountDownLatch

세 명의 작업자(Worker)가 있고, 관리자(Boss)는 모든 작업자가 하루 일을 끝마친 후에야 검사를 시작한다고 가정합니다. CountDownLatch를 사용하여 구현합니다.

작업자 (Worker)

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class Worker implements Runnable {
    private final CountDownLatch latch;
    private final String name;

    public Worker(CountDownLatch latch, String name) {
        this.latch = latch;
        this.name = name;
    }

    @Override
    public void run() {
        System.out.println(name + "이(가) 작업을 시작합니다.");
        try {
            int workTime = new Random().nextInt(10) + 1;
            TimeUnit.SECONDS.sleep(workTime);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        System.out.println(name + "이(가) 작업을 완료했습니다.");
        latch.countDown();
    }
}

관리자 (Boss)

import java.util.concurrent.CountDownLatch;

public class Boss implements Runnable {
    private final CountDownLatch latch;

    public Boss(CountDownLatch latch) {
        this.latch = latch;
    }

    @Override
    public void run() {
        System.out.println("관리자: 모든 작업자가 끝날 때까지 기다리는 중...");
        try {
            latch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        System.out.println("관리자: 모든 작업이 완료되었습니다. 검사 시작!");
    }
}

실행 코드

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class LatchTest {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        CountDownLatch latch = new CountDownLatch(3);

        executor.execute(new Worker(latch, "김철수"));
        executor.execute(new Worker(latch, "이영희"));
        executor.execute(new Worker(latch, "박민수"));
        executor.execute(new Boss(latch));

        executor.shutdown();
    }
}

실행 결과 (예시)

김철수이(가) 작업을 시작합니다.
이영희이(가) 작업을 시작합니다.
관리자: 모든 작업자가 끝날 때까지 기다리는 중...
박민수이(가) 작업을 시작합니다.
박민수이(가) 작업을 완료했습니다.
김철수이(가) 작업을 완료했습니다.
이영희이(가) 작업을 완료했습니다.
관리자: 모든 작업이 완료되었습니다. 검사 시작!

4. 실전 예제: CyclicBarrier

세 명의 작업자가 다리 건설 프로젝트를 진행합니다. 각자는 말뚝을 박는 작업을 수행하며, 세 명 모두 말뚝 박기를 완료한 후에야 함께 다리 상판을 올리는 다음 단계로 넘어갈 수 있습니다. CyclicBarrier를 사용합니다.

작업자 (BridgeWorker)

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class BridgeWorker implements Runnable {
    private final CyclicBarrier barrier;
    private final String name;

    public BridgeWorker(CyclicBarrier barrier, String name) {
        this.barrier = barrier;
        this.name = name;
    }

    @Override
    public void run() {
        try {
            System.out.println(name + "이(가) 말뚝 박기 시작.");
            Thread.sleep(4000); // 작업 시뮬레이션
            System.out.println(name + "이(가) 말뚝 박기 완료. 다른 작업자 대기 중...");
            barrier.await();
            System.out.println(name + ": 모두 완료! 다리 상판 작업 시작.");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

실행 코드

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BarrierTest {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        CyclicBarrier barrier = new CyclicBarrier(3, () ->
                System.out.println("--- 모든 말뚝 박기 완료! 다리 상판 작업 시작 준비 ---")
        );

        executor.execute(new BridgeWorker(barrier, "작업자 A"));
        executor.execute(new BridgeWorker(barrier, "작업자 B"));
        executor.execute(new BridgeWorker(barrier, "작업자 C"));

        executor.shutdown();
    }
}

실행 결과

작업자 A이(가) 말뚝 박기 시작.
작업자 B이(가) 말뚝 박기 시작.
작업자 C이(가) 말뚝 박기 시작.
작업자 A이(가) 말뚝 박기 완료. 다른 작업자 대기 중...
작업자 B이(가) 말뚝 박기 완료. 다른 작업자 대기 중...
작업자 C이(가) 말뚝 박기 완료. 다른 작업자 대기 중...
--- 모든 말뚝 박기 완료! 다리 상판 작업 시작 준비 ---
작업자 C: 모두 완료! 다리 상판 작업 시작.
작업자 A: 모두 완료! 다리 상판 작업 시작.
작업자 B: 모두 완료! 다리 상판 작업 시작.

5. 내부 동작 원리 (소스 코드 분석)

5.1 CountDownLatch 핵심 메서드

countDown() 메서드는 내부적으로 AbstractQueuedSynchronizer (AQS)를 사용하여 카운터를 감소시킵니다.

// CountDownLatch.java
public void countDown() {
    sync.releaseShared(1);
}

await() 메서드는 카운터가 0이 될 때까지 현재 스레드를 대기시킵니다.

// CountDownLatch.java
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

AQS의 acquireSharedInterruptiblytryAcquireShared의 결과가 0보다 작으면(즉, 카운터가 0이 아니면) 스레드를 대기 큐에 넣습니다.

5.2 CyclicBarrier 핵심 메서드

await() 메서드는 내부 dowait() 메서드를 호출합니다.

// CyclicBarrier.java
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe);
    }
}

dowait() 메서드는 ReentrantLock을 사용하여 스레드 안전성을 보장하며, 카운터(count)를 감소시킵니다. 카운터가 0이 되면 배리어 액션(barrierCommand)이 있다면 실행하고, nextGeneration()을 호출하여 배리어를 리셋합니다. 그렇지 않으면 Condition.await()을 통해 대기 상태에 들어갑니다.

// CyclicBarrier.java (요약)
private int dowait(boolean timed, long nanos) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;
        // ... 중간 코드 생략 ...
        int index = --count;
        if (index == 0) { // 마지막 스레드
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration(); // 배리어 리셋
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }
        // 카운터가 0이 아니면 대기 루프 진입
        for (;;) {
            try {
                if (!timed)
                    trip.await(); // Condition에서 대기
                // ...
            } // 예외 처리 생략
        }
    } finally {
        lock.unlock();
    }
}

생성자에서 배리어 액션을 설정할 수 있습니다:

public CyclicBarrier(int parties, Runnable barrierAction) {
    // ...
    this.barrierCommand = barrierAction;
}

CountDownLatch와 CyclicBarrier 비교 요약

특징CountDownLatchCyclicBarrier
재사용성일회성, 재사용 불가여러 번 재사용 가능
대기 대상특정 이벤트(카운트다운 완료)다른 스레드의 도착
동작 방식스레드가 카운터를 감소시키고, 다른 스레드가 0이 될 때까지 대기모든 참여 스레드가 await()을 호출할 때까지 서로 대기
배리어 액션없음선택적으로 배리어 열릴 때 실행될 Runnable 제공 가능

태그: CountDownLatch CyclicBarrier Java Concurrency Multi-threading synchronization

6월 17일 05:56에 게시됨