MQTT란?
MQTT(Message Queuing Telemetry Transport)는 경량의 발행/구독 기반 메시지 프로토콜로, 주로 IoT 장치 간의 통신에 사용됩니다. 저대역폭, 고지연 네트워크 환경에서도 안정적인 데이터 전송을 보장하며, TCP/IP 기반으로 동작합니다.
의존성 설정
Spring Boot 프로젝트에서 MQTT를 사용하기 위해선 Spring Integration과 Paho 클라이언트 라이브러리를 포함해야 합니다. 아래는 필요한 Maven 의존성입니다.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
MQTT 설정 정보
application.yml 파일에 브로커 연결 정보를 정의합니다.
mqtt:
broker: tcp://mqtt.rootcloudapp.com:1883
username: 0ba851e2e83609b9
password: 81757448a4df0d73
client-id: mqtt-client-${random.uuid}
keep-alive: 60
timeout: 30
topic: v4/p/post/thing/live/json/1.1
qos: 1
설정 클래스
@ConfigurationProperties를 사용해 설정 값을 자바 객체로 바인딩합니다.
@Data
@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
private String broker;
private String username;
private String password;
private String clientId;
private int keepAlive;
private int timeout;
private String topic;
private int qos;
}
MQTT 클라이언트 구성
Paho 기반 MqttClient 인스턴스를 생성하고, 연결 옵션을 설정합니다.
@Slf4j
public class MqttClientWrapper {
private MqttClient client;
private final MqttProperties properties;
public MqttClientWrapper(MqttProperties properties) {
this.properties = properties;
}
public void connect() throws MqttException {
try {
client = new MqttClient(properties.getBroker(), properties.getClientId(), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(properties.getUsername());
options.setPassword(properties.getPassword().toCharArray());
options.setKeepAliveInterval(properties.getKeepAlive());
options.setConnectionTimeout(properties.getTimeout());
options.setCleanSession(false);
client.setCallback(new DefaultMqttCallback());
client.connect(options);
log.info("MQTT 브로커에 성공적으로 연결되었습니다.");
subscribe(properties.getTopic(), properties.getQos());
} catch (MqttException e) {
log.error("MQTT 연결 실패: {}", e.getMessage());
throw e;
}
}
public void publish(String topic, String payload) {
if (client != null && client.isConnected()) {
try {
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(properties.getQos());
client.publish(topic, message);
log.debug("메시지 발행 완료 - Topic: {}, Payload: {}", topic, payload);
} catch (MqttException e) {
log.error("메시지 발행 중 오류 발생: {}", e.getMessage());
}
} else {
log.warn("MQTT 클라이언트가 연결되지 않아 메시지를 발행할 수 없습니다.");
}
}
public void subscribe(String topic, int qos) throws MqttException {
if (client != null && client.isConnected()) {
client.subscribe(topic, qos);
log.info("주제 구독 완료: {} (QoS: {})", topic, qos);
}
}
}
콜백 처리
수신된 메시지 및 연결 상태 변경을 처리하기 위한 콜백 클래스입니다.
@Slf4j
public class DefaultMqttCallback implements MqttCallback {
@Override
public void connectionLost(Throwable cause) {
log.warn("MQTT 연결이 끊어졌습니다. 재연결 시도 중...");
// 재연결 로직은 별도의 서비스나 스레드에서 관리하는 것이 좋음
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
log.info("수신된 메시지 - 주제: {}, 내용: {}", topic, payload);
// JSON 파싱 후 비즈니스 로직 처리 예시
if (topic.contains("datapoint")) {
DataPointEvent event = JsonUtil.fromJson(payload, DataPointEvent.class);
EventProcessor.process(event);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
log.debug("메시지 전송 완료: {}", token.isComplete());
}
}
애플리케이션 시작 시 연결
ApplicationRunner를 통해 애플리케이션 구동 시 자동으로 MQTT 서버에 연결합니다.
@Component
@Slf4j
public class MqttStartupRunner implements ApplicationRunner {
private final MqttClientWrapper mqttClient;
private final MqttProperties properties;
public MqttStartupRunner(MqttClientWrapper mqttClient, MqttProperties properties) {
this.mqttClient = mqttClient;
this.properties = properties;
}
@Override
public void run(ApplicationArguments args) {
try {
mqttClient.connect();
} catch (MqttException e) {
log.error("초기 MQTT 연결 실패", e);
}
}
}
메시지 송수신 컨트롤러
외부 요청을 통해 메시지를 발행하거나 구독을 조작할 수 있는 REST 엔드포인트입니다.
@RestController
@RequestMapping("/api/mqtt")
public class MqttController {
private final MqttClientWrapper mqttClient;
private final MqttProperties properties;
public MqttController(MqttClientWrapper mqttClient, MqttProperties properties) {
this.mqttClient = mqttClient;
this.properties = properties;
}
@PostMapping("/publish")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
mqttClient.publish(properties.getTopic(), message);
return ResponseEntity.ok("메시지 전송 완료");
}
@GetMapping("/subscribe")
public ResponseEntity<String> subscribeToTopic() throws MqttException {
mqttClient.subscribe(properties.getTopic(), properties.getQos());
return ResponseEntity.ok("구독 시작");
}
}
비동기 처리와 재연결 전략
실제 운영 환경에서는 네트워크 불안정에 대비해 재연결 로직과 백오프 정책을 추가하는 것이 중요합니다. ScheduledExecutorService를 사용하여 주기적인 상태 점검과 자동 재연결을 구현할 수 있습니다.