Spring Boot 기반 EMQ X 4.0 연동 및 중규모 IoT 아키텍처 설계 방안

1. 기술적 배경 및 도입 효과

1.1 EMQ X 4.0 의 핵심 기능

EMQ X 4.0 은 Erlang/OTP 생태계 위에서 작동하는 MQTT 브로커로, 수천만 대의 동시 연결 처리와 초당 수백만 건의 메시지 전송 능력을 갖췄습니다. 이전 세대 대비 분산형 데이터베이스인 Mnesia 를 기반으로 하여 ACID 특성을 보장하며, 클러스터 간 노드들이 풀 메쉬(Pull-Mesh) 토폴로지로 통신하여 장애 허용성과 확장성을 강화했습니다.

보안 측면에서는 X.509 인증서, JWT, 그리고 HTTP 기반 플러그인 방식을 지원하여 다양한 접근 제어 전략을 수립할 수 있습니다. 특히 SQL 유사 문법을 활용한 규칙 엔진을 내장하고 있어 코딩 없이 실시간 데이터 가공과 라우팅이 가능합니다.

1.2 스프링 부트와의 시너지

스프링 부트는 빠른 개발 주기와 풍부한 생태계를 제공하며, 이를 EMQ X 에 결합하면 비즈니스 로직과 인프라 관리 간의 경계를 명확히 할 수 있습니다. 의존성 주입(DI) 기능을 활용하여 클라이언트 관리 코드를 단순화하고, 액추에이터(Actuator) 를 통해 시스템 상태를 실시간으로 모니터링할 수 있는 장점이 있습니다.

중규모 시스템 환경에서는 데이터 무결성과 처리 지연 시간을 동시에 고려해야 하며, 본 아키텍처는 동기식 및 비동기식 쓰기 모드, 배치 최적화를 통해 이러한 요구사항을 충족합니다.

2. 초기 환경 설정 및 의존성 관리

2.1 빌드 도구 구성

프로젝트에 MQTT 프로토콜 지원을 위해 파호(Paho) 클라이언트 라이브러리와 스프링 인테그레이션을 포함해야 합니다. 마빈(Maven) 사용자의 경우 다음과 같은 설정을 적용합니다.

<dependencies>
  <!-- 웹 서버 기본 시작점 -->
  <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
  </dependency>

  <!-- MQTT 클라이언트 커넥션 -->
  <dependency>
      <groupId>org.eclipse.paho</groupId>
      <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
      <version>1.2.5</version>
  </dependency>

  <!-- 메시징 통합 프레임워크 -->
  <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-mqtt</artifactId>
  </dependency>

  <!-- 시스템 헬스 체크 -->
  <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-actuator</artifactId>
  </dependency>
</dependencies>

2.2 애플리케이션 속성 정의

브로커 연결 정보를 외부 파일에서 분리하여 유연한 배포를 지원합니다. application.yml 형식으로 아래 값을 기록합니다.

broker:
  endpoint: tcp://localhost:1883
  identity: spring-client-id
  credentials:
    username: admin
    password: secure_pwd
  options:
    keepalive: 60
    clean: true
    auto-reconnect: enabled

3. 클라이언트 생명주기와 연결 관리

클라이언트 인스턴스는 단일톤(Singleton) 으로 등록하여 자원 누수를 방지합니다. 별도의 설정 클래스를 만들어 빈(Bean) 을 생성합니다.

@Configuration
@RequiredArgsConstructor
public class IotConnectionManager {

    private final BrokerConfig properties;

    @Bean
    public MqttClient createBrokerClient() throws MqttException {
        String brokerUrl = properties.getEndpoint();
        String clientId = properties.getIdentity();

        // 메모리 기반 영속성 사용
        var client = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
        
        var connectOpts = new MqttConnectOptions();
        connectOpts.setUserName(properties.getCredentials().getUsername());
        connectOpts.setPassword(properties.getCredentials().getPassword().toCharArray());
        connectOpts.setKeepAliveInterval(properties.getOptions().getKeepalive());
        connectOpts.setCleanSession(properties.getOptions().isClean());

        // 이벤트 리스너 등록
        client.setCallback(new ConnectionListener(connectOpts));
        client.connect(connectOpts);

        return client;
    }
    
    static class ConnectionListener extends DefaultMqttCallback {
        @Override
        public void connectionLost(Throwable e) {
            log.error("브로커 연결이 끊어졌습니다.", e);
        }
        // ... messageArrived 과 deliveryComplete 구현 생략
    }
}

4. 데이터 전송 전략 최적화

4.1 개별 객체 직렬화 발행

전송 데이터는 JSON 형태로 직렬화된 후 바이트 배열로 변환됩니다. 자바 객체를 메시지 본문으로 매핑하는 서비스를 분리합니다.

@Service
public class MessageTransmissionHandler {

    private final MqttClient client;
    private final ObjectMapper jsonMapper;

    public MessageTransmissionHandler(MqttClient client, ObjectMapper jsonMapper) {
        this.client = client;
        this.jsonMapper = jsonMapper;
    }

    public <T> void sendObject(String channel, T dataObj, int qualityLevel) throws Exception {
        String jsonData = jsonMapper.writeValueAsString(dataObj);
        var message = new MqttMessage(jsonData.getBytes(StandardCharsets.UTF_8));
        message.setQos(qualityLevel);

        client.publish(channel, message);
    }
}

4.2 배치 처리 및 성능 향상

수천 개의 메시지를 일일이 전송하지 않고 배치 단위로 묶어 네트워크 오버헤드를 줄입니다. 특히 QoS 0 레벨을 사용하여 신뢰도가 낮지만 속도 우선인 데이터를 처리할 때 유리합니다.

public BatchReport submitBatch(String baseTopic, List<String> payloads) throws Exception {
    int batchSize = 500;
    var report = new BatchReport();
    
    for (int i = 0; i < payloads.size(); i += batchSize) {
        var subList = payloads.subList(i, Math.min(i + batchSize, payloads.size()));
        var timer = Timer.start();
        
        for (var data : subList) {
            client.publish(baseTopic + "/" + UUID.randomUUID(), new MqttMessage(data.getBytes()));
        }
        
        report.addDetail(subList.size(), timer.durationMillis());
        // 네트워크 대기 시간 조절
        Thread.sleep(50); 
    }
    return report;
}

4.3 동기 및 비동기 호출 패턴

비즈니스 중요도에 따라 응답 대기 여부를 결정합니다. 중요한 명령은 동기적으로 결과를 확인하고, 상태 업데이트는 콜백 함수를 이용한 비동기 방식으로 처리합니다.

// 비동기 성공/실패 콜백
IMqttActionListener successCb = token -> System.out.println("전송 완료: " + token.getMessageId());
IMqttActionListener failCb = (token, ex) -> log.error("전송 실패", ex);

client.publish(topic, message, null, (success ? successCb : failCb));

5. 시계열 데이터 분석 및 쿼리

저장된 MQTT 데이터는 보통 인플럭스 DB(InfluxDB) 와 같은 시계열 데이터베이스로 이동하며, 플럭스(Flux) 언어를 사용하여 조회합니다.

5.1 데이터 조회 서비스

쿼리 문장을 가변적으로 생성하여 특정 시간 범위와 태그 필터를 적용합니다.

@Service
public class TemporalMetricsFetcher {

    private final QueryApi fluxQueryClient;

    public List<FluxTable> retrieveByRange(String bucketName, String metricName, Duration window) {
        String query = String.format("""
                from(bucket: "%s")
                  |> range(start: %s)
                  |> filter(fn: (r) => r._measurement == "%s")
            """, bucketName, "-" + window.toMinutes() + "m", metricName);
        
        return fluxQueryClient.query(query);
    }
}

5.2 집계 및 통계 계산

원본 데이터를 필터링한 후 평균값이나 최대치를 구하는 집계 작업을 수행합니다.

public double getAverageTemperature(String deviceId, Instant startTime) {
    String sql = String.format("""
            from(bucket: "sensor_data")
              |> range(start: %s)
              |> filter(fn: (r) => r._field == "temperature" && r.device_id == "%s")
              |> mean()
        """, startTime.toString(), deviceId);
    
    var tables = fluxQueryClient.query(sql);
    // 첫 번째 레코드의 값 추출
    return tables.stream()
        .flatMap(t -> t.getRecords().stream())
        .findFirst()
        .map(r -> Double.parseDouble(r.getValue().toString()))
        .orElse(0.0);
}

6. 알림 및 예외 처리 체계

6.1 임계치 기반 경고 생성

규칙 엔진과 알림 시스템을 연동하여 특정 값이 임계치를 초과할 때 HTTP 엔드포인트로 알람을 발송합니다.

@Service
public class AlertOrchestrator {

    private final TaskScheduler scheduler;
    private final NotificationEngine notificationEngine;

    public void setupThresholdMonitor(String ruleName, double limit) {
        CheckDefinition check = new CheckDefinition();
        check.setName(ruleName);
        check.setCondition(String.format("_value > %.2f", limit));
        
        scheduler.scheduleAtFixedRate(() -> evaluateCheck(check), Duration.ofMinutes(5));
    }
    
    private void evaluateCheck(CheckDefinition def) {
        // 쿼리 결과에 따른 알림 발송 로직
        if (exceededLimit(def)) {
            notificationEngine.sendHttpNotification(buildAlertPayload(def));
        }
    }
}

6.2 중복 알림 억제

짧은 시간 내에 동일한 오류가 반복되는 경우 알림을 일시 중지하여 운영 팀에게 과도한 부하를 주지 않도록 합니다.

@Service
public class NotificationSuppressor {
    private final Map lastSentTime = ConcurrentHashMap.newKeySet();
    
    public boolean allowNotification(String alertId) {
        long current = System.currentTimeMillis();
        long interval = 30 * 60 * 1000; // 30 분
        
        synchronized(lastSentTime) {
            Long prev = lastSentTime.putIfAbsent(alertId, current);
            if (prev != null && current - prev < interval) {
                return false; // 중복 억제
            }
        }
        return true;
    }
}

7. 확장성 및 고가용성 설계

클라이언트 연결이 증가할 때 발생 가능한 병목 현상을 해결하기 위해 연결 пул(pool) 을 사용하고, 로드 밸런싱 전략을 적용합니다.

7.1 연결 풀 관리

빈번한 연결 단절 및 재접속 비용을 줄이기 위해 사전 생성된 클라이언트 인스턴스를 재사용합니다.

@Component
public class BrokerConnectionPool {
    private final BlockingQueue<MqttClient> availableClients;
    
    public MqttClient acquire() {
        // 풀에서 유효한 클라이언트 반환, 없을 시 생성
    }
    
    public void release(MqttClient client) {
        // 해제되어 다시 푸lish 추가
    }
}

7.2 샤딩 전략

대량의 디바이스를 처리할 때 데이터를 여러 브로커 인스턴스로 분산시키기 위해 해싱 기반 샤딩을 사용합니다.

public String resolveShardedTopic(String deviceId, int shardCount) {
    int hash = Math.abs(deviceId.hashCode());
    int shardIndex = hash % shardCount;
    return "shard-" + shardIndex + "/data";
}

8. 영속성 및 백업 복구

중요한 IoT 데이터는 브로커 내부 저장소뿐만 아니라 외부 RDBMS 나 시계열 DB 에 이중으로 저장하여 복구를 보장합니다.

8.1 하이브리드 스토리지 모델

메타데이터는 MySQL 에, 센서 로그는 InfluxDB 에 저장하는 구조를 채택합니다.

@Transactional
public void saveMetricData(DeviceInfo device, SensorReading reading) {
    // 1. 관계형 DB 에 장치 상태 저장
    jdbcTemplate.update("INSERT INTO devices_info ...", device.getId());
    
    // 2. 시계열 DB 에 측정값 저장
    influxWriter.writePoint(measurementBuilder.build(device, reading));
}

8.2 자동 백업 스케줄링

정기적으로 EMQ X 클러스터 설정과 메타데이터를 백업하여 재해 복구 용도로 보관합니다.

@Scheduled(cron = "0 0 2 * * ?") // 매일 새벽 2 시
public void backupClusterState() throws IOException {
    String path = "/backup/emqx_" + LocalDate.now() + ".tar.gz";
    Runtime.getRuntime().exec("emqx_ctl mnesia backup " + path);
}

9. 기기 등록 및 제어 관리

IoT 장비의 생애주기를 관리하기 위해 전용 컨트롤러와 서비스 계층을 분리합니다. 기기 인증 정보는 EMQ X Auth API 를 통해 동기화합니다.

@RestController
@RequestMapping("/api/devices")
public class DeviceAdminController {
    
    @PostMapping
    public ResponseEntity register(@RequestBody RegistrationRequest req) {
        String secret = generateSecret();
        deviceRepo.save(req.getDeviceId(), secret);
        emqxAuthClient.createCredential(req.getDeviceId(), secret);
        return ResponseEntity.ok(Map.of("secret", secret));
    }
}

10. 규칙 엔진 통합

복잡한 비즈니스 로직을 코드 대신 EMQ X 규칙 엔진에 위임하여 중앙 집중식 데이터 처리를 실현합니다.

10.1 SQL 기반 데이터 라우팅

특정 조건을 만족하는 메시지를 즉시 타겟 서비스나 다른 메시징 큐로 전환합니다.

/*
SELECT device_id, temperature FROM sensor_data WHERE temperature > 40
ACTION: Forward to Kafka Topic 'critical-alerts'
*/

public RuleDef buildCriticalTempRule() {
    ResourceDef httpRes = resourceClient.create(HttpResource.class, "http://monitor/api/alert");
    String sql = "SELECT payload.* FROM \"device/#\" WHERE payload.temp > 40";
    
    return ruleClient.create(sql, Action.forwardTo(httpRes.getId()));
}

10.2 데이터 정제 파이프라인

무효한 데이터를 제거하고 표준 포맷으로 변환하는 과정을 규칙 엔진에서 처리합니다.

/*
WHERE temp BETWEEN -40 AND 85 AND humidity BETWEEN 0 AND 100
TRANSFORM: Convert timestamp to UTC
OUTPUT: Push to TimeSeries DB
*/

태그: spring-boot emqx MQTT influxdb JVM

6월 27일 22:35에 게시됨