Spring Boot 기반 MQTT를 활용한 IoT 애플리케이션 구축

MQTT란?

MQTT(Message Queuing Telemetry Transport)는 경량의 발행/구독 기반 메시지 프로토콜로, 주로 IoT 장치 간의 통신에 사용됩니다. 저대역폭, 고지연 네트워크 환경에서도 안정적인 데이터 전송을 보장하며, TCP/IP 기반으로 동작합니다.

의존성 설정

Spring Boot 프로젝트에서 MQTT를 사용하기 위해선 Spring Integration과 Paho 클라이언트 라이브러리를 포함해야 합니다. 아래는 필요한 Maven 의존성입니다.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

MQTT 설정 정보

application.yml 파일에 브로커 연결 정보를 정의합니다.

mqtt:
  broker: tcp://mqtt.rootcloudapp.com:1883
  username: 0ba851e2e83609b9
  password: 81757448a4df0d73
  client-id: mqtt-client-${random.uuid}
  keep-alive: 60
  timeout: 30
  topic: v4/p/post/thing/live/json/1.1
  qos: 1

설정 클래스

@ConfigurationProperties를 사용해 설정 값을 자바 객체로 바인딩합니다.

@Data
@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
    private String broker;
    private String username;
    private String password;
    private String clientId;
    private int keepAlive;
    private int timeout;
    private String topic;
    private int qos;
}

MQTT 클라이언트 구성

Paho 기반 MqttClient 인스턴스를 생성하고, 연결 옵션을 설정합니다.

@Slf4j
public class MqttClientWrapper {

    private MqttClient client;
    private final MqttProperties properties;

    public MqttClientWrapper(MqttProperties properties) {
        this.properties = properties;
    }

    public void connect() throws MqttException {
        try {
            client = new MqttClient(properties.getBroker(), properties.getClientId(), new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(properties.getUsername());
            options.setPassword(properties.getPassword().toCharArray());
            options.setKeepAliveInterval(properties.getKeepAlive());
            options.setConnectionTimeout(properties.getTimeout());
            options.setCleanSession(false);

            client.setCallback(new DefaultMqttCallback());
            client.connect(options);
            log.info("MQTT 브로커에 성공적으로 연결되었습니다.");

            subscribe(properties.getTopic(), properties.getQos());
        } catch (MqttException e) {
            log.error("MQTT 연결 실패: {}", e.getMessage());
            throw e;
        }
    }

    public void publish(String topic, String payload) {
        if (client != null && client.isConnected()) {
            try {
                MqttMessage message = new MqttMessage(payload.getBytes());
                message.setQos(properties.getQos());
                client.publish(topic, message);
                log.debug("메시지 발행 완료 - Topic: {}, Payload: {}", topic, payload);
            } catch (MqttException e) {
                log.error("메시지 발행 중 오류 발생: {}", e.getMessage());
            }
        } else {
            log.warn("MQTT 클라이언트가 연결되지 않아 메시지를 발행할 수 없습니다.");
        }
    }

    public void subscribe(String topic, int qos) throws MqttException {
        if (client != null && client.isConnected()) {
            client.subscribe(topic, qos);
            log.info("주제 구독 완료: {} (QoS: {})", topic, qos);
        }
    }
}

콜백 처리

수신된 메시지 및 연결 상태 변경을 처리하기 위한 콜백 클래스입니다.

@Slf4j
public class DefaultMqttCallback implements MqttCallback {

    @Override
    public void connectionLost(Throwable cause) {
        log.warn("MQTT 연결이 끊어졌습니다. 재연결 시도 중...");
        // 재연결 로직은 별도의 서비스나 스레드에서 관리하는 것이 좋음
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
        log.info("수신된 메시지 - 주제: {}, 내용: {}", topic, payload);

        // JSON 파싱 후 비즈니스 로직 처리 예시
        if (topic.contains("datapoint")) {
            DataPointEvent event = JsonUtil.fromJson(payload, DataPointEvent.class);
            EventProcessor.process(event);
        }
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        log.debug("메시지 전송 완료: {}", token.isComplete());
    }
}

애플리케이션 시작 시 연결

ApplicationRunner를 통해 애플리케이션 구동 시 자동으로 MQTT 서버에 연결합니다.

@Component
@Slf4j
public class MqttStartupRunner implements ApplicationRunner {

    private final MqttClientWrapper mqttClient;
    private final MqttProperties properties;

    public MqttStartupRunner(MqttClientWrapper mqttClient, MqttProperties properties) {
        this.mqttClient = mqttClient;
        this.properties = properties;
    }

    @Override
    public void run(ApplicationArguments args) {
        try {
            mqttClient.connect();
        } catch (MqttException e) {
            log.error("초기 MQTT 연결 실패", e);
        }
    }
}

메시지 송수신 컨트롤러

외부 요청을 통해 메시지를 발행하거나 구독을 조작할 수 있는 REST 엔드포인트입니다.

@RestController
@RequestMapping("/api/mqtt")
public class MqttController {

    private final MqttClientWrapper mqttClient;
    private final MqttProperties properties;

    public MqttController(MqttClientWrapper mqttClient, MqttProperties properties) {
        this.mqttClient = mqttClient;
        this.properties = properties;
    }

    @PostMapping("/publish")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        mqttClient.publish(properties.getTopic(), message);
        return ResponseEntity.ok("메시지 전송 완료");
    }

    @GetMapping("/subscribe")
    public ResponseEntity<String> subscribeToTopic() throws MqttException {
        mqttClient.subscribe(properties.getTopic(), properties.getQos());
        return ResponseEntity.ok("구독 시작");
    }
}

비동기 처리와 재연결 전략

실제 운영 환경에서는 네트워크 불안정에 대비해 재연결 로직과 백오프 정책을 추가하는 것이 중요합니다. ScheduledExecutorService를 사용하여 주기적인 상태 점검과 자동 재연결을 구현할 수 있습니다.

태그: Spring Boot MQTT IoT Paho Message Broker

7월 4일 01:08에 게시됨