개발 환경 및 사전 준비
해당 예제는 Spring Boot 2.7.18 버전을 기준으로 작성되었습니다. pom.xml에 아래 의존성을 추가해야 합니다.
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.18</version>
</parent>
MQTT 의존성 설정
Spring Integration 기반의 MQTT 클라이언트를 사용합니다. 아래 두 개의 의존성을 추가하세요.
<!-- Spring Integration MQTT – 핵심 의존성 -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!-- Eclipse Paho – MQTT 클라이언트 라이브러리 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
application.properties 설정
spring.mqtt.url=tcp://${ip.host}:1883
spring.mqtt.client-id=${spring.application.name}-${random.uuid}
spring.mqtt.username=admin
spring.mqtt.password=public
spring.mqtt.default-topic=test/mqtt/topic
spring.mqtt.timeout=60
spring.mqtt.keep-alive=60
spring.mqtt.clean-session=true
spring.mqtt.qos=1
logging.level.org.springframework.integration.mqtt=INFO
logging.level.org.eclipse.paho=INFO
애플리케이션 YAML 설정
spring:
mqtt:
url: tcp://${ip.host}:1883
client-id: ${spring.application.name}-${random.uuid}
username: admin
password: public
default-topic: test/mqtt/topic
timeout: 60
keep-alive: 60
clean-session: true
qos: 1
logging:
level:
org.springframework.integration.mqtt: INFO
org.eclipse.paho: INFO
코드 구현
Config – MqttConfig.java
MQTT 브로커 연결 설정과 송수신 채널을 정의합니다.
package com.lt.mqtt.config;
import com.lt.mqtt.constant.MqttConstant;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
public class MqttConfig {
@Value("${spring.mqtt.url}")
private String serverUrl;
@Value("${spring.mqtt.client-id}")
private String clientIdentifier;
@Value("${spring.mqtt.username}")
private String user;
@Value("${spring.mqtt.password}")
private String pwd;
@Value("${spring.mqtt.default-topic}")
private String defaultTopic;
@Value("${spring.mqtt.timeout}")
private int connectionTimeout;
@Value("${spring.mqtt.keep-alive}")
private int keepAliveInterval;
@Value("${spring.mqtt.clean-session}")
private boolean cleanSession;
@Value("${spring.mqtt.qos}")
private int qualityOfService;
@Bean
public MqttConnectOptions configureMqttOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(serverUrl.split(","));
options.setUserName(user);
options.setPassword(pwd.toCharArray());
options.setConnectionTimeout(connectionTimeout);
options.setKeepAliveInterval(keepAliveInterval);
options.setCleanSession(cleanSession);
options.setAutomaticReconnect(true);
return options;
}
@Bean
public MqttPahoClientFactory buildMqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(configureMqttOptions());
return factory;
}
@Bean(name = "mqttSendChannel")
public MessageChannel createSendChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "mqttSendChannel")
public MessageHandler mqttMessageHandler() {
MqttPahoMessageHandler handler = new MqttPahoMessageHandler(
clientIdentifier + "-producer",
buildMqttClientFactory()
);
handler.setDefaultQos(qualityOfService);
handler.setDefaultTopic(defaultTopic);
handler.setAsync(true);
handler.setAsyncEvents(true);
return handler;
}
@Bean(name = "mqttReceiveChannel")
public MessageChannel createReceiveChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer mqttMessageProducer() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(
clientIdentifier + "-consumer",
buildMqttClientFactory(),
fetchSubscribedTopics()
);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(qualityOfService);
adapter.setOutputChannel(createReceiveChannel());
return adapter;
}
private String[] fetchSubscribedTopics() {
return MqttConstant.getTopics();
}
}
Constant – MqttConstant.java
package com.lt.mqtt.constant;
public class MqttConstant {
public static final String STANDARD_TOPIC = "test/mqtt/topic";
public static String[] getTopics() {
return new String[] { STANDARD_TOPIC };
}
}
Producer – MqttMessageSender.java
메시지 전송을 위한 유틸리티 클래스입니다.
package com.lt.mqtt.sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;
@Component
public class MqttMessageSender {
@Autowired
private MessageChannel mqttSendChannel;
public void publish(String content) {
mqttSendChannel.send(MessageBuilder.withPayload(content).build());
}
public void publish(String topic, String content) {
mqttSendChannel.send(
MessageBuilder.withPayload(content)
.setHeader("mqtt_topic", topic)
.build()
);
}
public void publish(String topic, int qosLevel, String content) {
mqttSendChannel.send(
MessageBuilder.withPayload(content)
.setHeader("mqtt_topic", topic)
.setHeader("mqtt_qos", qosLevel)
.build()
);
}
}
Consumer – MessageReceiver.java
수신된 메시지를 처리하는 리스너입니다.
package com.lt.mqtt.receiver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MessageReceiver {
@ServiceActivator(inputChannel = "mqttReceiveChannel")
public void processIncomingMessage(Message<String> incomingMsg) {
try {
String payload = incomingMsg.getPayload();
String topic = (String) incomingMsg.getHeaders().get("mqtt_receivedTopic");
Integer qos = (Integer) incomingMsg.getHeaders().get("mqtt_receivedQos");
log.info("수신된 메시지 본문: {}", payload);
log.info("수신된 메시지 토픽: {}", topic);
log.info("수신된 메시지 QoS: {}", qos);
// 실제 비즈니스 로직을 여기에 구현
} catch (Exception exception) {
log.error("메시지 처리 중 오류 발생: {}", exception.getMessage(), exception);
}
}
}
테스트 코드
@Autowired
private MqttMessageSender mqttSender;
@Test
public void testSendMqttMessage() {
for (int loop = 0; loop < 10; loop++) {
mqttSender.publish("테스트 메시지-" + loop);
}
}