위협 인텔리전스의 효과적인 가시화는 현대 보안 운영 센터에서 중요한 역할을 합니다. MISP에 저장된 대량의 IOC 데이터와 Elasticsearch의 강력한 검색 및 분석 기능을 결합하면, Kibana의 유연한 대시보드 설계를 통해 위협 탐지부터 대응까지 완전한 루프를 구축할 수 있습니다. 이 문서에서는 이러한 기술 스택을 실제로 통합하는 방법을 자세히 설명합니다.
1. 환경 구성 및 아키텍처 설계
기술 통합을 시작하기 전, 전체 데이터 흐름의 아키텍처를 명확히 설계해야 합니다. 일반적인 연계 방식은 다음과 같은 세 계층 구조로 이루어집니다:
- 데이터 수집 계층: MISP가 위협 인텔리전스 소스로 작용하며, API 또는 정기적인 내보내기를 통해 구조화된 IOC 데이터를 제공
- 데이터 처리 계층: Elasticsearch 클러스터가 IOC 데이터의 색인 및 저장을 담당하며, Logstash는 필요 시 데이터 변환 용도로 사용 가능
- 표시 및 분석 계층: Kibana는 가시화 분석을 위해 사용되며, 사용자 정의 대시보드와 경고 규칙을 지원
권장되는 기본 구성 요소 버전은 다음과 같습니다:
MISP ≥ 2.4.170
Elasticsearch 7.17.x
Kibana 7.17.x
Python ≥ 3.8
주의: 프로덕션 환경에서는 Elasticsearch를 세 노드 클러스터로 배포하여 데이터의 고가용성을 보장해야 합니다. 또한 MISP와 Elasticsearch 간의 통신은 내부 네트워크를 통해 이루어져야 하며, TLS 암호화를 사용하는 것이 좋습니다.
2. Elasticsearch 색인 템플릿 설정
효율적인 IOC 데이터 쿼리의 핵심은 적절한 색인 템플릿입니다. MISP의 일반적인 속성 유형에 따라 다음 매핑 템플릿을 설계할 수 있습니다:
PUT _template/misp_iocs_template
{
"index_patterns": ["misp_iocs*"],
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"event_time": {"type": "date"},
"incident_id": {"type": "keyword"},
"ioc_category": {"type": "keyword"},
"ioc_identifier": {
"type": "text",
"fields": {"exact": {"type": "keyword", "ignore_above": 256}}
},
"labels": {"type": "keyword"},
"risk_level": {"type": "byte"},
"discovery_date": {"type": "date"},
"modification_date": {"type": "date"},
"description": {"type": "text"},
"related_clusters": {
"type": "nested",
"properties": {
"cluster_name": {"type": "keyword"},
"cluster_info": {"type": "text"}
}
}
}
}
}
주요 필드 설명:
| 필드명 | 타입 | 설명 |
|---|---|---|
| ioc_category | keyword | IOC 카테고리(ip-dst, domain, md5 등) |
| ioc_identifier | text/keyword | 실제 IOC 값 |
| risk_level | byte | 위험 수준(1-4) |
| related_clusters | nested | MITRE ATT&CK과 같은 관련 데이터 |
색인을 생성한 후에는 아래 명령어로 템플릿 적용 상태를 확인할 수 있습니다:
GET _template/misp_iocs_template
다음으로 Python 스크립트를 작성하여 MISP 데이터를 Elasticsearch로 전송하는 예제를 살펴보겠습니다.
import requests
from elasticsearch import Elasticsearch
# Elasticsearch 연결 설정
es = Elasticsearch(["http://localhost:9200"])
def fetch_misp_data(api_url, key):
headers = {"Authorization": key, "Accept": "application/json"}
response = requests.get(api_url, headers=headers)
if response.status_code == 200:
return response.json()
else:
raise Exception("MISP 데이터 가져오기 실패")
def index_data_to_es(data_list):
for data in data_list:
es.index(index="misp_iocs", body=data)
if __name__ == "__main__":
misp_api_url = "https://your-misp-instance.com/attributes/restSearch"
misp_api_key = "your-api-key"
try:
misp_data = fetch_misp_data(misp_api_url, misp_api_key)
formatted_data = [
{
"event_time": item["timestamp"],
"incident_id": item["event_id"],
"ioc_category": item["type"],
"ioc_identifier": item["value"],
"labels": item["tags"],
"risk_level": item["threat_level"],
"discovery_date": item["first_seen"],
"modification_date": item["last_seen"],
"description": item["comment"],
"related_clusters": [{"cluster_name": cluster["name"], "cluster_info": cluster["description"]} for cluster in item.get("galaxy_clusters", [])]
}
for item in misp_data["response"]["Attribute"]
]
index_data_to_es(formatted_data)
print("데이터 전송 완료")
except Exception as e:
print(f"오류 발생: {e}")
위 스크립트는 MISP에서 데이터를 가져와 Elasticsearch에 색인하는 과정을 자동화합니다.