Spring Boot MQTT 연동 구현 가이드

개발 환경 및 사전 준비

해당 예제는 Spring Boot 2.7.18 버전을 기준으로 작성되었습니다. pom.xml에 아래 의존성을 추가해야 합니다.

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.18</version>
</parent>

MQTT 의존성 설정

Spring Integration 기반의 MQTT 클라이언트를 사용합니다. 아래 두 개의 의존성을 추가하세요.

<!-- Spring Integration MQTT – 핵심 의존성 -->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!-- Eclipse Paho – MQTT 클라이언트 라이브러리 -->
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

application.properties 설정

spring.mqtt.url=tcp://${ip.host}:1883
spring.mqtt.client-id=${spring.application.name}-${random.uuid}
spring.mqtt.username=admin
spring.mqtt.password=public
spring.mqtt.default-topic=test/mqtt/topic
spring.mqtt.timeout=60
spring.mqtt.keep-alive=60
spring.mqtt.clean-session=true
spring.mqtt.qos=1
logging.level.org.springframework.integration.mqtt=INFO
logging.level.org.eclipse.paho=INFO

애플리케이션 YAML 설정

spring:
  mqtt:
    url: tcp://${ip.host}:1883
    client-id: ${spring.application.name}-${random.uuid}
    username: admin
    password: public
    default-topic: test/mqtt/topic
    timeout: 60
    keep-alive: 60
    clean-session: true
    qos: 1
logging:
  level:
    org.springframework.integration.mqtt: INFO
    org.eclipse.paho: INFO

코드 구현

Config – MqttConfig.java

MQTT 브로커 연결 설정과 송수신 채널을 정의합니다.

package com.lt.mqtt.config;

import com.lt.mqtt.constant.MqttConstant;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

@Configuration
public class MqttConfig {

    @Value("${spring.mqtt.url}")
    private String serverUrl;

    @Value("${spring.mqtt.client-id}")
    private String clientIdentifier;

    @Value("${spring.mqtt.username}")
    private String user;

    @Value("${spring.mqtt.password}")
    private String pwd;

    @Value("${spring.mqtt.default-topic}")
    private String defaultTopic;

    @Value("${spring.mqtt.timeout}")
    private int connectionTimeout;

    @Value("${spring.mqtt.keep-alive}")
    private int keepAliveInterval;

    @Value("${spring.mqtt.clean-session}")
    private boolean cleanSession;

    @Value("${spring.mqtt.qos}")
    private int qualityOfService;

    @Bean
    public MqttConnectOptions configureMqttOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(serverUrl.split(","));
        options.setUserName(user);
        options.setPassword(pwd.toCharArray());
        options.setConnectionTimeout(connectionTimeout);
        options.setKeepAliveInterval(keepAliveInterval);
        options.setCleanSession(cleanSession);
        options.setAutomaticReconnect(true);
        return options;
    }

    @Bean
    public MqttPahoClientFactory buildMqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(configureMqttOptions());
        return factory;
    }

    @Bean(name = "mqttSendChannel")
    public MessageChannel createSendChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttSendChannel")
    public MessageHandler mqttMessageHandler() {
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(
            clientIdentifier + "-producer", 
            buildMqttClientFactory()
        );
        handler.setDefaultQos(qualityOfService);
        handler.setDefaultTopic(defaultTopic);
        handler.setAsync(true);
        handler.setAsyncEvents(true);
        return handler;
    }

    @Bean(name = "mqttReceiveChannel")
    public MessageChannel createReceiveChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer mqttMessageProducer() {
        MqttPahoMessageDrivenChannelAdapter adapter = 
            new MqttPahoMessageDrivenChannelAdapter(
                clientIdentifier + "-consumer",
                buildMqttClientFactory(),
                fetchSubscribedTopics()
            );
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(qualityOfService);
        adapter.setOutputChannel(createReceiveChannel());
        return adapter;
    }

    private String[] fetchSubscribedTopics() {
        return MqttConstant.getTopics();
    }
}

Constant – MqttConstant.java

package com.lt.mqtt.constant;

public class MqttConstant {

    public static final String STANDARD_TOPIC = "test/mqtt/topic";

    public static String[] getTopics() {
        return new String[] { STANDARD_TOPIC };
    }
}

Producer – MqttMessageSender.java

메시지 전송을 위한 유틸리티 클래스입니다.

package com.lt.mqtt.sender;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;

@Component
public class MqttMessageSender {

    @Autowired
    private MessageChannel mqttSendChannel;

    public void publish(String content) {
        mqttSendChannel.send(MessageBuilder.withPayload(content).build());
    }

    public void publish(String topic, String content) {
        mqttSendChannel.send(
            MessageBuilder.withPayload(content)
                .setHeader("mqtt_topic", topic)
                .build()
        );
    }

    public void publish(String topic, int qosLevel, String content) {
        mqttSendChannel.send(
            MessageBuilder.withPayload(content)
                .setHeader("mqtt_topic", topic)
                .setHeader("mqtt_qos", qosLevel)
                .build()
        );
    }
}

Consumer – MessageReceiver.java

수신된 메시지를 처리하는 리스너입니다.

package com.lt.mqtt.receiver;

import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MessageReceiver {

    @ServiceActivator(inputChannel = "mqttReceiveChannel")
    public void processIncomingMessage(Message<String> incomingMsg) {
        try {
            String payload = incomingMsg.getPayload();
            String topic = (String) incomingMsg.getHeaders().get("mqtt_receivedTopic");
            Integer qos = (Integer) incomingMsg.getHeaders().get("mqtt_receivedQos");

            log.info("수신된 메시지 본문: {}", payload);
            log.info("수신된 메시지 토픽: {}", topic);
            log.info("수신된 메시지 QoS: {}", qos);

            // 실제 비즈니스 로직을 여기에 구현
        } catch (Exception exception) {
            log.error("메시지 처리 중 오류 발생: {}", exception.getMessage(), exception);
        }
    }
}

테스트 코드

@Autowired
private MqttMessageSender mqttSender;

@Test
public void testSendMqttMessage() {
    for (int loop = 0; loop < 10; loop++) {
        mqttSender.publish("테스트 메시지-" + loop);
    }
}

태그: Spring Boot MQTT Eclipse Paho Spring Integration 메시징

6월 25일 16:48에 게시됨