Wazuh-Daten im Einklang: Vom nativen Format zum Splunk CIM

Vorschlag zur Transformation von Wazuh-Alerts in das Splunk Common Information Model (CIM).

Wazuh-Daten im Einklang: Vom nativen Format zum Splunk CIM

Im Bereich der Security Information and Event Management (SIEM) Systeme ist die Struktur von Log-Daten von entscheidender Bedeutung. Ähnlich wie bei der Log-Normalisierung in Graylog stellt sich auch bei Wazuh die Frage nach dem optimalen Datenmodell. Dieser Artikel beleuchtet die Unterschiede zwischen dem nativen Wazuh-Format, dem Elastic Common Schema (ECS) sowie dem Splunk Common Information Model (CIM) und zeigt, wie Wazuh-Alerts mithilfe einer Ingest Pipeline direkt im Wazuh Indexer (OpenSearch/Elasticsearch) in eine CIM-kompatible Struktur überführt werden können.

Die Qual der Wahl: Wazuh-nativ, ECS oder CIM?

Bevor eine Transformation der Daten stattfindet, ist ein Verständnis der unterschiedlichen Datenmodelle notwendig. Jedes Modell verfolgt eine eigene Philosophie, die es für bestimmte Anwendungsfälle mehr oder weniger geeignet macht.

Splunk Common Information Model (CIM): CIM wurde für Splunk entwickelt und verfolgt einen anderen Ansatz. Es besteht aus einer Sammlung von flacheren, thematisch getrennten Datenmodellen (z.B. Authentication, Network Traffic). Die Stärke liegt in der Verwendung konsistenter Feldnamen (src_ip, dest_ip, user, action) über all diese Modelle hinweg. Dies vereinfacht die Korrelation von Ereignissen aus völlig unterschiedlichen Quellen erheblich und ist der De-facto-Standard in vielen Security Operations Centern (SOCs).

(CIM-Event - z.B. Network Traffic)
|
├── _time
├── action ("allowed", "blocked")
├── src_ip
├── dest_ip
├── src_port
└── dest_port

Elastic Common Schema (ECS): ECS ist ein offener Standard, der von Elastic vorangetrieben wird. Er zeichnet sich durch eine tief verschachtelte, objektorientierte Struktur aus. Felder werden in logischen Gruppen wie host, source, destination oder event organisiert. Dies fördert die Konsistenz innerhalb des Elastic Stacks, kann aber in Systemen, die eine flachere Struktur erwarten, zu Komplexität führen.

(ECS-Dokument)
|
├── @timestamp
├── ecs.version
├── event
|   ├── category
|   └── action
├── source
|   ├── ip
|   └── port
└── destination
    ├── ip
    └── port

Wazuh (Nativ): Das native Format ist stark auf die internen Mechanismen von Wazuh zugeschnitten. Es ist sehr detailliert und enthält spezifische Objekte wie rule, agent oder syscheck. Obwohl es für die Wazuh-interne Analyse perfekt ist, fehlt ihm die Standardisierung für die Interoperabilität mit anderen Systemen.

wazuh-alert
|
├── timestamp
├── rule
|   ├── level
|   └── id
├── agent
|   ├── id
|   └── name
├── data
|   ├── srcip
|   ├── dstip
|   └── user
└── syscheck
    ├── path
    └── mode

Für Umgebungen, in denen Interoperabilität und die Korrelation mit Werkzeugen wie Splunk eine Rolle spielen, ist die Transformation in das CIM-Format ein strategischer Vorteil.

Das Experiment: Wazuh-Events in CIM-Struktur überführen

Mein Ziel ist es, die reichhaltigen Informationen eines Wazuh-Alerts in eine flache, CIM-kompatible Struktur zu bringen, da es so einfacher ist, Use-Cases aus meinem Homelab bei meinem Arbeitgeber einzuführen. Das Werkzeug der Wahl hierfür ist bei Wazuh die Ingest Pipeline direkt im Wazuh Indexer. Diese Pipeline fängt jeden neuen Alert ab, bevor er indiziert wird, und wendet eine Reihe von Prozessoren an, um ihn zu modifizieren, anzureichern und umzustrukturieren.

Schritt 1: Effizienz als oberstes Gebot – IP-Felder normalisieren

Ein typischer Wazuh-Alert kann eine Quell-IP-Adresse in vielen verschiedenen Feldern enthalten (data.srcip, data.win.eventdata.ipAddress, etc.). Anstatt für jedes mögliche Feld einen eigenen Prozessor zu definieren, wird ein kurzes Painless-Skript verwendet, um alle potenziellen Felder zu durchsuchen und die gefundene IP in ein temporäres Feld (temp_ip_for_lookup) zu kopieren. Dies stellt sicher, dass nachfolgende Anreicherungen wie GeoIP oder Threat Intelligence nur ein einziges Feld prüfen müssen. So lässt sich auch die Prozessorzeit reduzieren.

{
  "script": {
    "description": "IP-Felder in ein temporäres Feld für effiziente Lookups zusammenführen",
    "lang": "painless",
    "source": """
      String[] potential_fields = new String[] {
        'data.srcip', 'data.src_ip', 'data.win.eventdata.ipAddress',
        'data.aws.sourceIPAddress', 'data.aws.client_ip',
        'data.aws.service.action.networkConnectionAction.remoteIpDetails.ipAddressV4',
        'data.aws.httpRequest.clientIp', 'data.gcp.jsonPayload.sourceIP',
        'data.office365.ClientIP'
      };
      for (String field : potential_fields) {
        if (ctx.containsKey(field) && ctx[field] != null) {
          ctx.temp_ip_for_lookup = ctx[field];
          break;
        }
      }
    """
  }
}

Schritt 2: Anreicherung mit Threat Intelligence

Bevor das finale Mapping erfolgt, können die normalisierten Daten angereichert werden. Ein klassisches Beispiel ist der Abgleich der Quell-IP mit einer Liste bekannter Tor-Exit-Nodes. Dies wird am effizientesten mit dem enrich-Prozessor realisiert. Der Prozess umfasst drei Stufen:

{
  "enrich": {
    "description": "Anreicherung mit Tor Exit Node Intelligence",
    "policy_name": "tor-nodes-policy",
    "field": "temp_ip_for_lookup",
    "target_field": "threat.tor_info",
    "ignore_missing": true
  }
}

Wenn eine IP aus einem Alert in der Tor-Liste gefunden wird, fügt dieser Prozessor dem Event automatisch ein threat.tor_info-Objekt hinzu.

Den Index mit den Tor-Exit-Nodes kann man per Cron-Job aktuell halten und alle 30 Minuten folgendes Script starten:

#!/bin/bash

# --- Variablen ---
THREAT_INTEL_DIR="/opt/threat-intel"
IPSET_FILE="${THREAT_INTEL_DIR}/tor_exits.ipset"
JSONL_FILE="${THREAT_INTEL_DIR}/tor_exits.jsonl"
SOURCE_INDEX=".enrich-tor-exit-nodes-source" # Der Index, aus dem die Anreicherung liest
INDEXER_URL="https://localhost:9200"
INDEXER_USER="admin" # anpassen
INDEXER_PASS="YourPassword" # anpassen

# --- 1. Datei herunterladen ---
wget -qO "$IPSET_FILE" https://iplists.firehol.org/files/tor_exits.ipset

# --- 2. In JSONL konvertieren ---
# Wir entfernen Kommentare, leere Zeilen und erstellen für jede IP ein JSON-Objekt.
# Wir fügen auch ein boolesches Feld hinzu, was die spätere Abfrage erleichtert.
grep -v '^#' "$IPSET_FILE" | grep . | \
awk '{
  print "{\"index\":{}}"
  print "{\"ip_address\":\"" $1 "\", \"is_tor_exit_node\":true, \"source_list\":\"firehol_tor_exits\"}"
}' > "$JSONL_FILE"

# --- 3. Alten Index löschen und neuen erstellen ---
# Dies ist der einfachste Weg, die Daten zu aktualisieren.
curl -k -u "${INDEXER_USER}:${INDEXER_PASS}" -s -X DELETE "${INDEXER_URL}/${SOURCE_INDEX}" > /dev/null
curl -k -u "${INDEXER_USER}:${INDEXER_PASS}" -s -X PUT "${INDEXER_URL}/${SOURCE_INDEX}" -H 'Content-Type: application/json' -d'
{
  "mappings": {
    "properties": {
      "ip_address": { "type": "ip" },
      "is_tor_exit_node": { "type": "boolean" },
      "source_list": { "type": "keyword" }
    }
  }
}
'

# --- 4. Neue Daten per Bulk API hochladen ---
curl -k -u "${INDEXER_USER}:${INDEXER_PASS}" -s -X POST "${INDEXER_URL}/_bulk?pretty&refresh" -H 'Content-Type: application/x-ndjson' --data-binary "@${JSONL_FILE}" > /dev/null

echo "Tor exit nodes list updated and indexed."

Schritt 3: Das Herzstück – Das Mapping auf das Common Information Model

Dies ist der Kern der Transformation. Ein weiteres Painless-Skript führt die Umbenennung und Umstrukturierung durch:

  • Vendor-Objekt erstellen: Wazuh-spezifische, aber wertvolle Metadaten (rule, agent, manager, id) werden in ein sauberes, verschachteltes wazuh-Objekt verschoben. Dies hält das Top-Level-Schema sauber, bewahrt aber die Kontextinformationen.
  • CIM-Felder mappen: Wazuh-Felder werden in ihre CIM-Äquivalente umbenannt (z.B. data.srcip zu src, data.dstip zu dest).
  • Logik anwenden: Abstrakte CIM-Felder wie action werden basierend auf der Logik des Events erstellt. Ein Wazuh-Rule-Level >= 8 könnte beispielsweise zu action = "failure" führen, während ein data.status mit “allowed” zu action = "success" wird.
  • Spezifische Modelle bedienen: Ereignistypen wie File-Integrity-Monitoring (syscheck) werden auf die Felder des CIM Change-Datenmodells (object, object_category, change_type) abgebildet.

Die finale Pipeline: Alles in einem

Zusammengesetzt ergibt sich eine leistungsstarke Ingest Pipeline, die jeden Wazuh-Alert in Echtzeit validiert, normalisiert, anreichert, transformiert und bereinigt, bevor er überhaupt in der Datenbank landet. Das Endergebnis ist ein sauberes, schlankes und CIM-kompatibles Event.

Hier ist die vollständige Pipeline-Definition:

{
  "description": "Wazuh-Alerts nach Splunk CIM transformieren, inklusive Threat Intel Anreicherung",
  "processors": [
    {
      "json": {
        "description": "1. Eingehende JSON-Nachricht parsen",
        "field": "message",
        "add_to_root": true
      }
    },
    {
      "script": {
        "description": "2. IP-Felder für effiziente Lookups zusammenführen",
        "lang": "painless",
        "source": "String[]p=new String[]{'data.srcip','data.src_ip','data.win.eventdata.ipAddress','data.aws.sourceIPAddress','data.aws.client_ip','data.aws.service.action.networkConnectionAction.remoteIpDetails.ipAddressV4','data.aws.httpRequest.clientIp','data.gcp.jsonPayload.sourceIP','data.office365.ClientIP'};for(String f:p){if(ctx.containsKey(f)&&ctx[f]!=null){ctx.temp_ip_for_lookup=ctx[f];break;}}"
      }
    },
    {
      "enrich": {
        "description": "3. Anreicherung mit Tor Exit Node Intelligence",
        "policy_name": "tor-nodes-policy",
        "field": "temp_ip_for_lookup",
        "target_field": "threat.tor_info",
        "ignore_missing": true
      }
    },
    {
      "geoip": {
        "description": "4. GeoIP-Lookup einmalig durchführen",
        "field": "temp_ip_for_lookup",
        "target_field": "source.geo",
        "ignore_missing": true
      }
    },
    {
      "script": {
        "description": "5. Kern-Mapping: Wazuh-Felder auf Splunk CIM abbilden",
        "lang": "painless",
        "source": """
          // M. Meister
          // Vendor-Objekt für Wazuh-Metadaten erstellen
          ctx.wazuh = new HashMap();
          if (ctx.containsKey('rule')) { ctx.wazuh.rule = ctx.rule; ctx.remove('rule'); }
          if (ctx.containsKey('agent')) { ctx.wazuh.agent = ctx.agent; }
          if (ctx.containsKey('manager')) { ctx.wazuh.manager = ctx.manager; ctx.remove('manager'); }
          if (ctx.containsKey('id')) { ctx.wazuh.alert_id = ctx.id; ctx.remove('id'); }
          if (ctx.containsKey('full_log')) { ctx.wazuh.full_log = ctx.full_log; ctx.remove('full_log'); }
          if (ctx.containsKey('decoder')) { ctx.wazuh.decoder = ctx.decoder; ctx.remove('decoder'); }

          // --- Generische CIM-Felder ---
          if (ctx.containsKey('wazuh.rule.level')) {
            if (ctx.wazuh.rule.level >= 8) { ctx.action = "failure"; }
            else if (ctx.wazuh.rule.level > 0 && ctx.wazuh.rule.level <= 4) { ctx.action = "success"; }
          }
          if (ctx.containsKey('data') && ctx.data.containsKey('status')) {
             if (ctx.data.status.toLowerCase().contains('success') || ctx.data.status.toLowerCase().contains('allowed')) { ctx.action = "success"; }
             else if (ctx.data.status.toLowerCase().contains('fail') || ctx.data.status.toLowerCase().contains('denied') || ctx.data.status.toLowerCase().contains('blocked')) { ctx.action = "failure"; }
          }

          // --- Network Traffic Data Model ---
          if(ctx.containsKey('data')) {
            if (ctx.data.containsKey('srcip')) { ctx.src = ctx.data.srcip; }
            if (ctx.data.containsKey('dstip')) { ctx.dest = ctx.data.dstip; }
            if (ctx.data.containsKey('srcport')) { ctx.src_port = ctx.data.srcport; }
            if (ctx.data.containsKey('dstport')) { ctx.dest_port = ctx.data.dstport; }
            if (ctx.data.containsKey('protocol')) { ctx.transport = ctx.data.protocol; }
          }

          // --- Authentication Data Model ---
          if (ctx.containsKey('data') && ctx.data.containsKey('user')) { ctx.user = ctx.data.user; }
          if (ctx.wazuh.agent != null && ctx.wazuh.agent.containsKey('name')) { ctx.dest_host = ctx.wazuh.agent.name; }

          // --- Endpoint / Change Data Model ---
          if (ctx.containsKey('syscheck')) {
             ctx.object = ctx.syscheck.path;
             ctx.object_category = "file";
             if (ctx.syscheck.mode == 'added') { ctx.change_type = 'created'; }
             else if (ctx.syscheck.mode == 'deleted') { ctx.change_type = 'deleted'; }
             else if (ctx.syscheck.mode == 'modified') { ctx.change_type = 'modified'; }
             ctx.wazuh.syscheck = ctx.syscheck;
             ctx.remove('syscheck');
          }
          
          ctx.remove('agent');
        """
      }
    },
    {
      "date": {
        "description": "6. Primären Zeitstempel setzen",
        "field": "timestamp",
        "target_field": "@timestamp",
        "formats": ["ISO8601"]
      }
    },
    {
      "remove": {
        "description": "7. Finale Bereinigung temporärer und redundanter Felder",
        "fields": [
          "message", "timestamp", "data", "location", "temp_ip_for_lookup",
          "ecs", "beat", "tags", "@version", "log", "host", "event"
        ],
        "ignore_missing": true
      }
    }
  ],
  "on_failure" : [{
    "set": {
      "field": "error.message",
      "value": "## WA_CIM_PIPELINE_ERROR ## {{ _ingest.on_failure_message }}"
    }
  }]
}

Fazit

Die Transformation von Wazuh-Alerts in das Splunk Common Information Model (CIM) ist kein reiner Selbstzweck, sondern ein strategischer Schritt zur Erhöhung der Interoperabilität und Analysefähigkeit in heterogenen IT-Sicherheitslandschaften. So lassen sich Usecases, die ich mühevoll im Homelab entwickelt habe, in den dienstlichen Alltag übertragen, ohne die Feldnahmen in den Abfragen ändern zu müssen. So können auch Grafana-Dashboards mit ins Berufsleben mitgenommen werden - aber auch umgekehrt.