Logstash: Construire un pipeline de traitement de données pour Bro IDS

Posté par Binor le 05/09/2017

Dans ce blog post, le second de notre série sur la suite ELK, nous présentons une introduction à Logstash. Par définition, Logstash est un pipeline de traitement de données qui fournit les composants pour lire des données provenant de diverses sources, pour transformer/enrichir ces données et enfin pour les renvoyer à une base de données ou à un autre pipeline pour d’additionnel traitement. Avec son architecture modulaire, Logstash offre un cadre robuste pour construire facilement un pipeline de traitement de données. Pour montrer certaines des fonctionnalités de Logstash, nous utiliserons les log générés par Bro comme entrée pour notre pipeline de traitement de données.

Notre exemple de pipeline de traitement de données

L’installation de Logstash est simple. Vous pouvez télécharger les binaires officiels pour toutes les majeure distribution Linux. Le seul prérequis dont vous aurez besoin est Java (OpenJDK ou la distribution officiel par Oracle). Après l’installation, selon vos besoins, vous pouvez modifier les deux fichiers suivants:

  • LOGSTASH_INSTALL_PATH/jvm.options: pour augmenter la taille de la pile (Heap) Java.
  • LOGSTASH_INSTALL_PATH/logstash.yml: pour modifier les configurations Logstash comme le nombre de worker, la taille du lot (nombre d’unité de donnée qu’un worker traitera) …

Après l’installation de Logstash, la prochaine étape est la configuration de la pipeline de données. Avant d’entrer dans les détails de la configuration, il est important de noter que dans le jargon Logstash, un événement reçu via l’une des multiple sources supportées est représenté par un objet de données (JSON). Cet objet peut être transformé en utilisant un ou plusieurs des filtres et expédié vers un autre pipeline de traitement ou une base de données via un des plugins de sortie (outputs).

Input

Nous commençons d’abord avec les plugins d’entrée: les composants en charge de la collecte de données. Par défaut, Logstash fournit plusieurs plugins d’entrée. La liste des plugins Logstash contient des functions simple comme la lecture d’un fichier ou l’écoute sur un port TCP/UDP. Elle contient également des plugins pour lire à partir de systèmes de messagerie/files d’attente comme Redis ou Kafka ou à partir d’une base de données (en utilisant l’interface JDBC). La liste complète des plugins d’entrée est disponible dans la documentation officielle de Logstash.

Dans notre exemple, nous utilisons Filebeat pour lire les fichiers de log générés par Bro et les envoyer au plugin d’entrée Beats de Logstash.

input {
	beats {
		port => 5044
	}
}

La configuration d’entrée Filebeat suivante est utilisée pour envoyer les logs Bro à notre pipeline de traitement Logstash.

- input_type: log
  paths:
    - /opt/bro/logs/current/conn.log
  exclude_lines: ['^#']
  fields:
    logtype: broconn
  document_type: logstash-bro
  fields_under_root: true

Nous ajoutons ici des champs supplémentaires à l’objet de donnée généré par le processus Filebeat. Le champ logtype est utilise ici pour ajouter un tag aux données et ainsi définir les règles de filtrage/analyse qui seront appliqués sur le message de log par la suite.

Output

Pour la sortie de notre pipeline de traitement, Logstash offre plusieurs choix. Dans notre example, nous utilisons le plugin de sortie Elasticsearch. Les logs Bro sont sauvegarder dans un index Elasticsearch nommé dynamiquement en fonction du type et de la date d’occurence de l’événement. Avec l’exemple de configuration suivant, les logs seront stockés dans un index nommé: logstash-bro-2017.09.06.

output {
  elasticsearch {
    hosts => ["127.0.0.1"]
    index => "%{type}-%{+YYYY.MM.dd}"
  }
}

Filtres

Après la définition de l’entrée et de la sortie de notre pipeline de traitement des données, la dernière étape est la définition des filtres qui seront appliqués à ces données. Logstash propose une variété de filtres prêts à être utilisés pour analyser les données et extraire des méta-données. L’analyse peut être effectuée en utilisant des expressions régulières ou des structures CSV ou aussi des paires clé/valeur. En outre, certains de ces filtres permettent d’enrichir les données avec des informations de localisation géographique. Avec le filtre Ruby, vous bénéficier d’un grand degré de liberté avec la possibilité de transformer vos données en leurs applicant du code Ruby.

Comme mentionné ci-dessus, nous utilisons les logs Bro comme exemple pour démontrer les capacités Logstash. En particulier, nous utilisons les types de log Bro suivants:

  • CONN: les logs des flux réseaux, montrant un résumé de la connexion IP vue sur le réseau.
  • HTTP: les logs des connexions HTTP, montrant un résumé du trafic HTTP.
  • DNS: les logs du trafic DNS, montrant un résumé des requêtes DNS et de leurs réponses.

Les exemples suivants montrent des échantillons des logs générés par Bro:

# conn.log
1503305215.555986       CN77ln4RMSpStQV2X9      10.10.2.201     34240   145.239.49.27   80      tcp     http    2.1
52318   606     186     SF      T       F       0       ShADadfF        6       866     5       390     (empty)
# http.log
1503305207.181153       CfHZwi2X7yU6i6uk27      10.10.2.201     55178   54.243.45.82    80      1       GET     pin
g.chartbeat.net /ping?h=lemonde.fr&p=/&u=D2WApBJg9NeBfqUX-&d=lemonde.fr&g=12231&n=1&f=00001&c=0&x=0&m=0&y=16423&o=1
340&w=501&j=45&R=1&W=0&I=0&E=0&e=0&r=&b=41665&t=Dv-EDdCijdiMTqbSiDDktxxMlkUO&V=93&i=Le Monde.fr - Actualit\xc3\xa9s
 et Infos en France et dans le monde&tz=0&sn=1&EE=0&sv=C3gSusB3FL0pFM9d3lXu6j_19yB&_    http://www.lemonde.fr/  1.1
        Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/604.1 (KHTML, like Gecko) Version/11.0 Safari/604.1 elementary
OS/0.4 (Loki) Epiphany/3.18.11  0       43      200     OK      -       -       (empty) -       -       -       -
      - -       FaItHM1BXflwyPgKy4      -       image/gif

# dns.log
1503305221.493568       CIkYyh2UsRrISregCg      10.10.2.201     41847   10.10.2.1       53      udp     27035   0.0
00446   ping.chartbeat.net      1       C_INTERNET      1       A       0       NOERROR F       F       T       T
0       54.243.45.82    45.000000       F

La première étape du traitement des logs Bro est l’analyse des données pour extraire les métadonnées correspondantes. Logstash fournit un filtre puissant pour faire ceci: grok. À l’aide de la documentation Bro, nous pouvons écrire des expressions régulières grok pour analyser les types de log mentionnés ci-dessus. En se basant sur ces échantillons, nous pouvons voir que les trois types de log partagent certains champs communs. Nous commençons par extraire les champs communs comme la date de l’événement, les adresses IP (source et destination) et les ports (source et destination). Les expressions régulières grok suivantes définissent la structure des logs Bro que nous avons énumérés ci-dessus.

LOG_HEADER %{BASE16FLOAT:bro_ts}\t(-|%{DATA:cuid})\t(-|%{IP:src_ip})\t(-|%{INT:src_port})\t(-|%{IP:dst_ip})\t(-|%{INT:dst_port})\t%{GREEDYDATA:bro_message}

CONN (-|%{DATA:proto})\t(-|%{DATA:service})\t(-|%{BASE16FLOAT:conn_duration})\t(-|%{INT:bytes_sent})\t(-|%{INT:bytes_received})\t(-|%{DATA:conn_state})\t(-|%{DATA:local_orig})\t(-|%{DATA:local_resp})\t(-|%{INT:missing_bytes})\t(-|%{DATA:conn_history})\t(-|%{INT:orig_pkts})\t(-|%{INT:orig_ip_bytes})\t(-|%{INT:resp_pkts})\t(-|%{INT:resp_ip_bytes})\t%{GREEDYDATA:tunnel_parents}

HTTP %{INT:trans_depth}\t(-|%{DATA:http_method})\t(-|%{DATA:domain})\t(-|%{DATA:http_uri})\t(-|%{DATA:referer})\t(-|%{DATA:http_version})\t(-|%{DATA:user_agent})\t(-|%{INT:bytes_sent})\t(-|%{INT:bytes_received})\t(-|%{INT:http_resp_code})\t(-|%{DATA:http_resp_msg})\t(-|%{INT:http_info_code})\t(-|%{DATA:http_info_msg})\t(-|%{DATA:http_tags})\t(-|%{DATA:username})\t(-|%{DATA:http_pwd})\t(-|%{DATA:http_proxied})\t(-|%{DATA:sent_fuids})\t(-|%{DATA:sent_file_name})\t(-|%{DATA:sent_mime_types})\t(-|%{DATA:resp_fuids})\t(-|%{DATA:resp_file_name})\t(-|%{GREEDYDATA:resp_mime_types})

DNS %{DATA:proto}\t(-|%{DATA:dns_trans_id})\t(-|%{BASE16FLOAT:conn_duration})\t%{DATA:domain}\t(-|%{INT:dns_qclass})\t%{DATA:query_class}\t(-|%{INT:dns_qtype})\t%{DATA:dns_query_type}\t(-|%{INT:dns_rcode})\t%{DATA:dns_resp_code}\t%{DATA:dns_aa}\t%{DATA:dns_tc}\t%{DATA:dns_rd}\t%{DATA:dns_ra}\t%{DATA:dns_z}\t(-|%{DATA:dns_answers})\t(-|%{DATA:dns_answers_ttls})\t%{GREEDYDATA:dns_rejected}

L’étape suivante, après la définition des expressions réguliers grok, est la définition des filtres. Ces filtres sont des scripts qui appliquent les expressions régulières grok aux données reçus et les enrichissent avec des méta-données.

filter {
  if [type] == "logstash-bro" and [logtype] == "broconn" {
    grok {
      patterns_dir => ["/etc/logstash/patterns"]
      match => { "message" => "%{LOG_HEADER}" }
    }
    if [bro_message] {
      grok {
     	  patterns_dir => ["/etc/logstash/patterns"]
        match => { "bro_message" => "%{CONN}" }
      }
      mutate {
        replace => [ "message", "%{bro_message}" ]
        remove_field => [ "bro_message" ]
      }
    }
    mutate {
      convert => { "dst_port" => "integer" }
      ...
    }
    ...
    if [bro_ts] {
      date {
        match => [ "bro_ts", "UNIX"]
        timezone => "UTC"
      }
      mutate { remove_field => [ "bro_ts" ] }
    }
  }
}

Dans l’extrait de code ci-dessus, nous appliquons le filtre grok aux log de connexion Bro en utilisant l’expression régulière CONN que nous avons défini précédemment. Nous utilisons également le filtre mutate pour convertir le port de destination de la connection (dst_port) en nombre. Cela indique à Elasticsearch de traiter ce champ comme un entier durant son processus d’indexation. Ceci va permettre l’application de certaines fonctionnalité de recherche et/ou d’aggregation sur ce champ.

Un autre exemple de plugins Logstash est le filtre Ruby. En utilisant ce filtre, nous pouvons transformer ou enrichir les données traversant la pipeline à l’aide d’un script Ruby. Dans l’exemple des logs DNS, le champ dns_answers contient une liste d’attributs (par exemple IP) associés au nom de domaine en question. L’objectif ici est d’extraire le ou les IP associés au nom de domaine et les sauvegarder dans un nouveau champ.

if [dns_answers] {
	mutate {
		split => { "dns_answers" => "," }
	}
	ruby {
		init => "require 'resolv'"
		code => "
			aa_list = event.get('dns_answers')
			ll = aa_list.length
			d_ips = []
			(1..ll).each do |i|
				vv = aa_list[i-1]
				if vv =~ Resolv::IPv4::Regex || vv =~ Resolv::IPv6::Regex
					d_ips.push(vv)
				end
			end
			if d_ips.length > 0
				event.set('domain_ips', d_ips)
			end
		"
	}
}

Dans l’extrait de code ci-dessus, nous utilisons d’abord l’opérateur split du filtre mutate pour transformer le champ dns_answers en une liste. Ensuite, nous utilisons un script ruby pour parcourir cette liste et en extraire les addresses IP. Ces addresses sont ensuite ajoutés à un nouveau champ appelé domain_ips.

Notre dernier exemple des filtres Logstash est le filtre geoip. Avec ce filtre, nous pouvons enrichir nos données avec les coordonnés géographique associes aux addresses IP. Logstash contient par défaut la base de données GeoLite2 City de Maxmind. Pour obtenir une version plus à jour ou plus détailler (payante), vous avez la possibilité de télécharger la base de données et de configurer le filtre pour l’utiliser.

if [src_ip] and ![src_location] {
	if [src_ip] !~ /^10\./ and [src_ip] !~ /^192\.168\./ and [src_ip] !~ /^172\.(1[6-9]|2[0-9]|3[0-1])\./ {
		geoip {
			source => "src_ip"
			target => "src_geoip"
			database => "/etc/logstash/vendors/GeoLite2-City.mmdb"
			add_field => [ "src_location", "%{[src_geoip][longitude]}" ]
			add_field => [ "src_location", "%{[src_geoip][latitude]}"  ]
		}
		mutate {
			convert => [ "src_location", "float" ]
		}
	}
}

Dans l’exemple ci-dessus, nous ajoutons deux nouveaux attributs: src_geoip et src_location. Le premier est un objet JSON qui contient les données générés par le filtre geoip, en particulier le nom du pays et le nom de la ville entre autre. Le second attribut contient la longitude et la latitude correspondant à la location de l’adresse IP. Ce champ est essentiellement utilisé pour le type de données Elasticsearch geo_point. Ce choix va nous permettre d’effectuer des agrégations et recherche à base de distance géographique. Pour activer cette fonctionnalités, nous mettons à jour le schema de notre index Elasticsearch pour spécifier que le champ src_location est de type geo_point.

	"src_location" : { "type": "geo_point"},

Ceci conclut notre présentation sur l’utilisation de Logstash pour construire un pipeline de traitement des données. Nous avons utilisé ici comme exemple de données les logs générés par le système de detection d’intrusion réseaux Bro. Notre objectif ici était de donner un aperçu des capacités de Logstash comme outil ETL. Vous trouverez plus de détails sur les scripts et les configurations présentés ici dans notre répertoire GitHub.

Connecte avec nous!


+222 45 29 00 29

+222 45 29 85 40