Flume 아키텍처
Flume는 대규모 로그 데이터 수집, 집계 및 전송을 위한 분산형, 신뢰성 있는, 고가용성 시스템입니다. 다양한 데이터 소스에서 데이터를 수집하고, 처리 후 목적지로 전송하는 기능을 제공합니다. 이 시스템에서는 데이터 흐름을 '이벤트(Event)'라는 단위로 관리하며, 이벤트는 바이트 배열 형태의 데이터와 헤더 정보를 포함합니다.
이벤트(Event)란?
이벤트는 Flume의 기본 데이터 단위로, 외부 소스에서 생성된 데이터를 포함합니다. 이벤트는 소스(Source)에서 포맷팅 후 채널(Channel)로 전달되며, 싱크(Sink)에서 처리됩니다. 이벤트는 전송 과정에서 트랜잭션 단위로 보장되어, 데이터 손실 없이 전송됩니다.
Agent 구성 요소
Flume의 핵심은 Agent입니다. Agent는 JVM을 기반으로 작동하며, Source, Channel, Sink 세 가지 주요 구성 요소로 구성됩니다. 이 구성 요소들은 생산자-버퍼-소비자의 구조로 동작합니다.
- Source: 다양한 형식의 로그 데이터를 수집하는 역할. Avro, HTTP, 네트워크 통신 등 다양한 프로토콜 지원.
- Channel: 데이터를 일시적으로 저장하는 버퍼. 메모리, 파일, JDBC 등 다양한 저장 방식 지원.
- Sink: 데이터를 최종 목적지로 전송하는 컴포넌트. HDFS, HBase, 다른 Agent의 Source 등으로 연결 가능.
Flume 실행 메커니즘
Flume은 데이터를 수집 후 목적지로 전송하기 위해 중간 캐시(Channel)를 사용합니다. 데이터가 정상적으로 목적지에 도착한 후에만 캐시 데이터를 삭제합니다. 이 과정에서 Source, Channel, Sink의 조합이 유연하게 설정 가능하며, 사용자 정의 구성 파일을 통해 다양한 결합 방식을 구현할 수 있습니다.
Flume 신뢰성
Flume는 트랜잭션 기반의 메커니즘으로 데이터 전송 과정의 신뢰성을 보장합니다. Sink는 이벤트가 다음 Agent로 전달되거나 외부 저장소에 성공적으로 저장된 후에만 Channel에서 데이터를 제거합니다. 이 방식으로 Agent 간의 데이터 흐름에서도 손실 없는 전송이 가능합니다.
다중 Agent 연동 사례
여러 Agent를 순차적으로 연결하여 데이터를 수집하고 최종 저장 시스템으로 전송할 수 있습니다. 예를 들어, 로그 수집 Agent → 중간 처리 Agent → HDFS 저장 Agent의 구조를 사용할 수 있습니다. 하지만 Agent 수를 증가시키면 복잡도가 높아지므로 실무에서는 적절한 균형이 필요합니다.
사용 사례
사례 1: 콘솔 입력 데이터 출력
netcat 소스, 메모리 채널, 로거 싱크를 사용한 예제입니다.
# 구성 파일 설정
agentConfig.sources = r1
agentConfig.channels = c1
agentConfig.sinks = k1
agentConfig.sources.r1.type = netcat
agentConfig.sources.r1.bind = 192.168.1.100
agentConfig.sources.r1.port = 8888
agentConfig.sources.r1.channels = c1
agentConfig.channels.c1.type = memory
agentConfig.channels.c1.capacity = 1000
agentConfig.channels.c1.transactionCapacity = 1000
agentConfig.sinks.k1.type = logger
agentConfig.sinks.k1.channel = c1
사례 2: 로컬 파일 → HDFS 전송
spooldir 소스, 메모리 채널, HDFS 싱크를 사용한 예제입니다.
# 구성 파일 설정
agentConfig.sources = r1
agentConfig.channels = c1
agentConfig.sinks = k1
agentConfig.sources.r1.type = spooldir
agentConfig.sources.r1.spoolDir = /path/to/logs
agentConfig.sources.r1.interceptors = i1
agentConfig.sources.r1.interceptors.i1.type = timestamp
agentConfig.channels.c1.type = memory
agentConfig.channels.c1.capacity = 10000
agentConfig.channels.c1.transactionCapacity = 1000
agentConfig.sinks.k1.type = hdfs
agentConfig.sinks.k1.hdfs.path = hdfs://cluster/path
agentConfig.sinks.k1.hdfs.filePrefix = logs_
agentConfig.sinks.k1.hdfs.fileType = DataStream
agentConfig.sinks.k1.hdfs.rollCount = 100
사례 3: Java 코드에서 HDFS 전송
Avro 소스, 메모리 채널, HDFS 싱크를 사용한 예제입니다.
// Java 코드 예시
public class LogProducer {
public static void main(String[] args) throws Exception {
while (true) {
String logEntry = "Log entry: " + System.currentTimeMillis();
// Avro 소스에 데이터 전송 로직
sendToFlume(logEntry);
Thread.sleep(1000);
}
}
}
사례 4: HBase 로그 모니터링
exec 소스, 메모리 채널, HBase 싱크를 사용한 예제입니다.
# 구성 파일 설정
agentConfig.sources = r1
agentConfig.channels = c1
agentConfig.sinks = k1
agentConfig.sources.r1.type = exec
agentConfig.sources.r1.command = tail -F /var/log/app.log
agentConfig.channels.c1.type = memory
agentConfig.channels.c1.capacity = 10000
agentConfig.channels.c1.transactionCapacity = 100
agentConfig.sinks.k1.type = hbase
agentConfig.sinks.k1.table = monitored_logs
agentConfig.sinks.k1.columnFamily = cf1
agentConfig.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHBaseEventSerializer
사례 5: HTTP 소스 활용
HTTP 소스, 메모리 채널, 로거 싱크를 사용한 예제입니다.
# 구성 파일 설정
agentConfig.sources = r1
agentConfig.channels = c1
agentConfig.sinks = k1
agentConfig.sources.r1.type = http
agentConfig.sources.r1.port = 50000
agentConfig.sources.r1.channels = c1
agentConfig.sinks.k1.type = logger
agentConfig.sinks.k1.channel = c1
agentConfig.channels.c1.type = memory
agentConfig.channels.c1.capacity = 10000
agentConfig.channels.c1.transactionCapacity = 100