1. 기술적 배경 및 도입 효과
1.1 EMQ X 4.0 의 핵심 기능
EMQ X 4.0 은 Erlang/OTP 생태계 위에서 작동하는 MQTT 브로커로, 수천만 대의 동시 연결 처리와 초당 수백만 건의 메시지 전송 능력을 갖췄습니다. 이전 세대 대비 분산형 데이터베이스인 Mnesia 를 기반으로 하여 ACID 특성을 보장하며, 클러스터 간 노드들이 풀 메쉬(Pull-Mesh) 토폴로지로 통신하여 장애 허용성과 확장성을 강화했습니다.
보안 측면에서는 X.509 인증서, JWT, 그리고 HTTP 기반 플러그인 방식을 지원하여 다양한 접근 제어 전략을 수립할 수 있습니다. 특히 SQL 유사 문법을 활용한 규칙 엔진을 내장하고 있어 코딩 없이 실시간 데이터 가공과 라우팅이 가능합니다.
1.2 스프링 부트와의 시너지
스프링 부트는 빠른 개발 주기와 풍부한 생태계를 제공하며, 이를 EMQ X 에 결합하면 비즈니스 로직과 인프라 관리 간의 경계를 명확히 할 수 있습니다. 의존성 주입(DI) 기능을 활용하여 클라이언트 관리 코드를 단순화하고, 액추에이터(Actuator) 를 통해 시스템 상태를 실시간으로 모니터링할 수 있는 장점이 있습니다.
중규모 시스템 환경에서는 데이터 무결성과 처리 지연 시간을 동시에 고려해야 하며, 본 아키텍처는 동기식 및 비동기식 쓰기 모드, 배치 최적화를 통해 이러한 요구사항을 충족합니다.
2. 초기 환경 설정 및 의존성 관리
2.1 빌드 도구 구성
프로젝트에 MQTT 프로토콜 지원을 위해 파호(Paho) 클라이언트 라이브러리와 스프링 인테그레이션을 포함해야 합니다. 마빈(Maven) 사용자의 경우 다음과 같은 설정을 적용합니다.
<dependencies>
<!-- 웹 서버 기본 시작점 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- MQTT 클라이언트 커넥션 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<!-- 메시징 통합 프레임워크 -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!-- 시스템 헬스 체크 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
2.2 애플리케이션 속성 정의
브로커 연결 정보를 외부 파일에서 분리하여 유연한 배포를 지원합니다. application.yml 형식으로 아래 값을 기록합니다.
broker:
endpoint: tcp://localhost:1883
identity: spring-client-id
credentials:
username: admin
password: secure_pwd
options:
keepalive: 60
clean: true
auto-reconnect: enabled
3. 클라이언트 생명주기와 연결 관리
클라이언트 인스턴스는 단일톤(Singleton) 으로 등록하여 자원 누수를 방지합니다. 별도의 설정 클래스를 만들어 빈(Bean) 을 생성합니다.
@Configuration
@RequiredArgsConstructor
public class IotConnectionManager {
private final BrokerConfig properties;
@Bean
public MqttClient createBrokerClient() throws MqttException {
String brokerUrl = properties.getEndpoint();
String clientId = properties.getIdentity();
// 메모리 기반 영속성 사용
var client = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
var connectOpts = new MqttConnectOptions();
connectOpts.setUserName(properties.getCredentials().getUsername());
connectOpts.setPassword(properties.getCredentials().getPassword().toCharArray());
connectOpts.setKeepAliveInterval(properties.getOptions().getKeepalive());
connectOpts.setCleanSession(properties.getOptions().isClean());
// 이벤트 리스너 등록
client.setCallback(new ConnectionListener(connectOpts));
client.connect(connectOpts);
return client;
}
static class ConnectionListener extends DefaultMqttCallback {
@Override
public void connectionLost(Throwable e) {
log.error("브로커 연결이 끊어졌습니다.", e);
}
// ... messageArrived 과 deliveryComplete 구현 생략
}
}
4. 데이터 전송 전략 최적화
4.1 개별 객체 직렬화 발행
전송 데이터는 JSON 형태로 직렬화된 후 바이트 배열로 변환됩니다. 자바 객체를 메시지 본문으로 매핑하는 서비스를 분리합니다.
@Service
public class MessageTransmissionHandler {
private final MqttClient client;
private final ObjectMapper jsonMapper;
public MessageTransmissionHandler(MqttClient client, ObjectMapper jsonMapper) {
this.client = client;
this.jsonMapper = jsonMapper;
}
public <T> void sendObject(String channel, T dataObj, int qualityLevel) throws Exception {
String jsonData = jsonMapper.writeValueAsString(dataObj);
var message = new MqttMessage(jsonData.getBytes(StandardCharsets.UTF_8));
message.setQos(qualityLevel);
client.publish(channel, message);
}
}
4.2 배치 처리 및 성능 향상
수천 개의 메시지를 일일이 전송하지 않고 배치 단위로 묶어 네트워크 오버헤드를 줄입니다. 특히 QoS 0 레벨을 사용하여 신뢰도가 낮지만 속도 우선인 데이터를 처리할 때 유리합니다.
public BatchReport submitBatch(String baseTopic, List<String> payloads) throws Exception {
int batchSize = 500;
var report = new BatchReport();
for (int i = 0; i < payloads.size(); i += batchSize) {
var subList = payloads.subList(i, Math.min(i + batchSize, payloads.size()));
var timer = Timer.start();
for (var data : subList) {
client.publish(baseTopic + "/" + UUID.randomUUID(), new MqttMessage(data.getBytes()));
}
report.addDetail(subList.size(), timer.durationMillis());
// 네트워크 대기 시간 조절
Thread.sleep(50);
}
return report;
}
4.3 동기 및 비동기 호출 패턴
비즈니스 중요도에 따라 응답 대기 여부를 결정합니다. 중요한 명령은 동기적으로 결과를 확인하고, 상태 업데이트는 콜백 함수를 이용한 비동기 방식으로 처리합니다.
// 비동기 성공/실패 콜백
IMqttActionListener successCb = token -> System.out.println("전송 완료: " + token.getMessageId());
IMqttActionListener failCb = (token, ex) -> log.error("전송 실패", ex);
client.publish(topic, message, null, (success ? successCb : failCb));
5. 시계열 데이터 분석 및 쿼리
저장된 MQTT 데이터는 보통 인플럭스 DB(InfluxDB) 와 같은 시계열 데이터베이스로 이동하며, 플럭스(Flux) 언어를 사용하여 조회합니다.
5.1 데이터 조회 서비스
쿼리 문장을 가변적으로 생성하여 특정 시간 범위와 태그 필터를 적용합니다.
@Service
public class TemporalMetricsFetcher {
private final QueryApi fluxQueryClient;
public List<FluxTable> retrieveByRange(String bucketName, String metricName, Duration window) {
String query = String.format("""
from(bucket: "%s")
|> range(start: %s)
|> filter(fn: (r) => r._measurement == "%s")
""", bucketName, "-" + window.toMinutes() + "m", metricName);
return fluxQueryClient.query(query);
}
}
5.2 집계 및 통계 계산
원본 데이터를 필터링한 후 평균값이나 최대치를 구하는 집계 작업을 수행합니다.
public double getAverageTemperature(String deviceId, Instant startTime) {
String sql = String.format("""
from(bucket: "sensor_data")
|> range(start: %s)
|> filter(fn: (r) => r._field == "temperature" && r.device_id == "%s")
|> mean()
""", startTime.toString(), deviceId);
var tables = fluxQueryClient.query(sql);
// 첫 번째 레코드의 값 추출
return tables.stream()
.flatMap(t -> t.getRecords().stream())
.findFirst()
.map(r -> Double.parseDouble(r.getValue().toString()))
.orElse(0.0);
}
6. 알림 및 예외 처리 체계
6.1 임계치 기반 경고 생성
규칙 엔진과 알림 시스템을 연동하여 특정 값이 임계치를 초과할 때 HTTP 엔드포인트로 알람을 발송합니다.
@Service
public class AlertOrchestrator {
private final TaskScheduler scheduler;
private final NotificationEngine notificationEngine;
public void setupThresholdMonitor(String ruleName, double limit) {
CheckDefinition check = new CheckDefinition();
check.setName(ruleName);
check.setCondition(String.format("_value > %.2f", limit));
scheduler.scheduleAtFixedRate(() -> evaluateCheck(check), Duration.ofMinutes(5));
}
private void evaluateCheck(CheckDefinition def) {
// 쿼리 결과에 따른 알림 발송 로직
if (exceededLimit(def)) {
notificationEngine.sendHttpNotification(buildAlertPayload(def));
}
}
}
6.2 중복 알림 억제
짧은 시간 내에 동일한 오류가 반복되는 경우 알림을 일시 중지하여 운영 팀에게 과도한 부하를 주지 않도록 합니다.
@Service
public class NotificationSuppressor {
private final Map lastSentTime = ConcurrentHashMap.newKeySet();
public boolean allowNotification(String alertId) {
long current = System.currentTimeMillis();
long interval = 30 * 60 * 1000; // 30 분
synchronized(lastSentTime) {
Long prev = lastSentTime.putIfAbsent(alertId, current);
if (prev != null && current - prev < interval) {
return false; // 중복 억제
}
}
return true;
}
}
7. 확장성 및 고가용성 설계
클라이언트 연결이 증가할 때 발생 가능한 병목 현상을 해결하기 위해 연결 пул(pool) 을 사용하고, 로드 밸런싱 전략을 적용합니다.
7.1 연결 풀 관리
빈번한 연결 단절 및 재접속 비용을 줄이기 위해 사전 생성된 클라이언트 인스턴스를 재사용합니다.
@Component
public class BrokerConnectionPool {
private final BlockingQueue<MqttClient> availableClients;
public MqttClient acquire() {
// 풀에서 유효한 클라이언트 반환, 없을 시 생성
}
public void release(MqttClient client) {
// 해제되어 다시 푸lish 추가
}
}
7.2 샤딩 전략
대량의 디바이스를 처리할 때 데이터를 여러 브로커 인스턴스로 분산시키기 위해 해싱 기반 샤딩을 사용합니다.
public String resolveShardedTopic(String deviceId, int shardCount) {
int hash = Math.abs(deviceId.hashCode());
int shardIndex = hash % shardCount;
return "shard-" + shardIndex + "/data";
}
8. 영속성 및 백업 복구
중요한 IoT 데이터는 브로커 내부 저장소뿐만 아니라 외부 RDBMS 나 시계열 DB 에 이중으로 저장하여 복구를 보장합니다.
8.1 하이브리드 스토리지 모델
메타데이터는 MySQL 에, 센서 로그는 InfluxDB 에 저장하는 구조를 채택합니다.
@Transactional
public void saveMetricData(DeviceInfo device, SensorReading reading) {
// 1. 관계형 DB 에 장치 상태 저장
jdbcTemplate.update("INSERT INTO devices_info ...", device.getId());
// 2. 시계열 DB 에 측정값 저장
influxWriter.writePoint(measurementBuilder.build(device, reading));
}
8.2 자동 백업 스케줄링
정기적으로 EMQ X 클러스터 설정과 메타데이터를 백업하여 재해 복구 용도로 보관합니다.
@Scheduled(cron = "0 0 2 * * ?") // 매일 새벽 2 시
public void backupClusterState() throws IOException {
String path = "/backup/emqx_" + LocalDate.now() + ".tar.gz";
Runtime.getRuntime().exec("emqx_ctl mnesia backup " + path);
}
9. 기기 등록 및 제어 관리
IoT 장비의 생애주기를 관리하기 위해 전용 컨트롤러와 서비스 계층을 분리합니다. 기기 인증 정보는 EMQ X Auth API 를 통해 동기화합니다.
@RestController
@RequestMapping("/api/devices")
public class DeviceAdminController {
@PostMapping
public ResponseEntity register(@RequestBody RegistrationRequest req) {
String secret = generateSecret();
deviceRepo.save(req.getDeviceId(), secret);
emqxAuthClient.createCredential(req.getDeviceId(), secret);
return ResponseEntity.ok(Map.of("secret", secret));
}
}
10. 규칙 엔진 통합
복잡한 비즈니스 로직을 코드 대신 EMQ X 규칙 엔진에 위임하여 중앙 집중식 데이터 처리를 실현합니다.
10.1 SQL 기반 데이터 라우팅
특정 조건을 만족하는 메시지를 즉시 타겟 서비스나 다른 메시징 큐로 전환합니다.
/*
SELECT device_id, temperature FROM sensor_data WHERE temperature > 40
ACTION: Forward to Kafka Topic 'critical-alerts'
*/
public RuleDef buildCriticalTempRule() {
ResourceDef httpRes = resourceClient.create(HttpResource.class, "http://monitor/api/alert");
String sql = "SELECT payload.* FROM \"device/#\" WHERE payload.temp > 40";
return ruleClient.create(sql, Action.forwardTo(httpRes.getId()));
}
10.2 데이터 정제 파이프라인
무효한 데이터를 제거하고 표준 포맷으로 변환하는 과정을 규칙 엔진에서 처리합니다.
/*
WHERE temp BETWEEN -40 AND 85 AND humidity BETWEEN 0 AND 100
TRANSFORM: Convert timestamp to UTC
OUTPUT: Push to TimeSeries DB
*/