Kafka 통합을 위한 Spring Boot 설정
Spring Boot 애플리케이션에서 Apache Kafka와의 통신을 구현하기 위해선 적절한 의존성 추가와 구성이 필요하다. 본 문서는 인증 보안(SASL/SCRAM)이 적용된 Kafka 클러스터와의 연결부터, 메시지 송수신 로직 구현까지를 다룬다.
의존성 설정 (Maven)
Kafka 통신을 위해 spring-kafka 모듈을 프로젝트에 포함시킨다.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.5</version>
</dependency>
프로듀서 측 설정 (application.yml)
Kafka 프로듀서는 아래와 같은 보안 및 성능 관련 파라미터를 포함하여 구성된다.
spring:
kafka:
bootstrap-servers: 17*.***.***.***:19091,17*.***.***.***:19091,17*.***.***.***:19091
properties:
sasl:
mechanism: SCRAM-SHA-256
jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="***" password="***";
security.protocol: SASL_PLAINTEXT
producer:
retries: 3
acks: all
batch-size: 512000
properties:
linger.ms: 1
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
compression-type: lz4
properties:
enable.idempotence: false
메시지 전송 유틸리티 클래스
전역적으로 사용할 수 있는 Kafka 전송 헬퍼 클래스를 정의한다. 스프링 컨텍스트에서 KafkaTemplate을 주입받아 비동기 전송과 콜백 처리를 수행한다.
package com.example.util;
import cn.hutool.extra.spring.SpringUtil;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.SuccessCallback;
@Slf4j
public class KafkaMessageSender {
private static final KafkaTemplate<String, String> template = SpringUtil.getBean(KafkaTemplate.class);
private static final SuccessCallback onSuccess = result -> log.info("메시지 전송 성공: {}", result);
private static final FailureCallback onFailure = ex -> log.error("메시지 전송 실패: ", ex);
public static <T> void sendMessage(String topic, T payload) {
sendMessage(topic, payload, onSuccess, onFailure);
}
public static <T> void sendMessage(String topic, T payload, SuccessCallback success, FailureCallback failure) {
String jsonPayload = JSONObject.toJSONString(payload);
template.send(topic, jsonPayload).addCallback(success, failure);
}
}
샘플 데이터 생성 및 전송 서비스
테스트용 데이터를 생성하고 Kafka 토픽으로 전송하는 서비스 예제이다.
@Service
@Slf4j
public class DataProducerService {
@Value("${test.maxrows}")
private int maxRows;
private static final String[] MOBILE_PREFIXES = {
"134", "135", "136", "137", "138", "139",
"150", "151", "152", "157", "158", "159",
"180", "181", "182", "183", "184", "185",
"174", "192", "178"
};
public void publishTestData() {
Random random = new Random();
for (int i = 0; i < maxRows; i++) {
DataRecord record = new DataRecord();
record.setTimestamp(new Date());
record.setPhoneNumber(generatePhoneNumber(random));
record.setValue1(random.nextFloat());
record.setValue2(random.nextDouble());
MessageEnvelope envelope = new MessageEnvelope();
envelope.setData(JSONObject.toJSONString(record));
envelope.setRequestId(java.util.UUID.randomUUID().toString());
envelope.setSendTime(new Date());
KafkaMessageSender.sendMessage("user-activity-test", envelope);
}
}
private String generatePhoneNumber(Random rand) {
StringBuilder sb = new StringBuilder(MOBILE_PREFIXES[rand.nextInt(MOBILE_PREFIXES.length)]);
for (int i = 0; i < 8; i++) {
sb.append(rand.nextInt(10));
}
return sb.toString();
}
}
컨슈머 측 yml 설정
메시지 소비자는 수동 커밋 모드로 동작하며, 배치 처리와 오류 핸들링을 고려해 구성한다.
spring:
kafka:
consumer:
enable-auto-commit: false
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 1000
properties:
session.timeout.ms: 120000
request.timeout.ms: 600000
listener:
ack-mode: manual_immediate
concurrency: 3
type: batch
missing-topics-fatal: false
properties:
sasl:
mechanism: SCRAM-SHA-256
jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="***" password="***";
security.protocol: SASL_PLAINTEXT
고급 컨슈머 팩토리 설정
배치 및 단일 레코드 오류 시 Redis로 장애 메시지를 저장하는 커스텀 에러 핸들러를 정의한다.
@Component
@Slf4j
public class KafkaConsumerConfiguration {
@Autowired
private ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory;
@Autowired
private RedisClient redisClient; // 커스텀 Redis 접근 컴포넌트
@Bean("customListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> customContainerFactory() {
containerFactory.getContainerProperties().setPollTimeout(3000);
// 단일 레코드 오류 처리
containerFactory.setErrorHandler((exception, record, consumer) -> {
log.error("단건 수신 오류: {}", record.topic(), exception);
try {
MessageEnvelope msg = JSONUtil.toBean(record.value().toString(), MessageEnvelope.class);
redisClient.saveToHash("failed-messages", msg.getRequestId(), record.value().toString());
} catch (Exception e) {
log.warn("Redis 저장 실패", e);
}
});
// 배치 오류 처리
containerFactory.setBatchErrorHandler((exception, records) -> {
log.error("배치 수신 중 오류 발생", exception);
records.forEach(record -> {
try {
MessageEnvelope msg = JSONUtil.toBean(record.value().toString(), MessageEnvelope.class);
redisClient.saveToHash("failed-messages", msg.getRequestId(), record.value().toString());
} catch (Exception e) {
log.warn("개별 레코드 처리 실패", e);
}
});
});
return containerFactory;
}
}
메시지 수신 및 처리 서비스
Kafka 리스너를 통해 메시지를 수신하고, 파이프라인 방식으로 Redis에 일괄 저장한다.
@Service
@Slf4j
public class MessageConsumerService {
@Resource(name = "secondaryRedisTemplate")
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ExecutorService processingPool;
@KafkaListener(
topics = "user-activity-test",
groupId = "test-consumer-group",
containerFactory = "customListenerContainerFactory",
concurrency = "16"
)
public void processMessages(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
log.info("{}개의 메시지 수신", records.size());
long start = System.currentTimeMillis();
try {
List<MessageEnvelope> envelopes = records.stream()
.map(record -> JSONUtil.toBean(record.value(), MessageEnvelope.class))
.toList();
// Redis 파이프라이닝을 통한 고속 저장
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
Random rand = new Random();
for (MessageEnvelope envelope : envelopes) {
String key = "event:" + rand.nextInt(1000);
byte[] val = envelope.getData().getBytes();
connection.setEx(key.getBytes(), 120, val);
}
return null;
});
} catch (Exception e) {
log.error("메시지 처리 중 오류", e);
} finally {
log.info("처리 완료, 소요 시간: {}ms", System.currentTimeMillis() - start);
ack.acknowledge(); // 수동 커밋
}
}
}