OPNsense Forum

International Forums => German - Deutsch => Topic started by: superwinni2 on May 20, 2019, 10:11:27 am

Title: ELK Stack / Logging / Rollup Job
Post by: superwinni2 on May 20, 2019, 10:11:27 am
Hallo zusammen


an die Leute die sich mit ELK auskennen oder es sogar auch zum Loggen der FWLog benutzen:


Bin gerade dabei eine kleien Anleitung zu schreiben um die FWLogs mit ELK zu speichern und zu durchsuchen...
Kann mir jemand helfen beim einstellen des Rollup Jobs damit ich nur noch 10 Felder anstatt der kompletten 38 Felder speichere?
Brauche bei mir aktuell ~3 GB/Tag zum speichern der 38 Felder... Hoffe dies kann ich so reduzieren.
Bekomme es selbst leider nicht hin...

Danke und Gruß
Title: Re: ELK Stack / Logging / Rollup Job
Post by: fabian on May 20, 2019, 07:34:25 pm
ich würde dir raten, unwichtige felder schon im logstash raus zu schmeißen, dann hast du die arbeit am schluss nicht.
Title: Re: ELK Stack / Logging / Rollup Job
Post by: superwinni2 on May 21, 2019, 07:46:50 am
Hallo Fabian


habe doch gerade gedacht... Das Profilbild kennt man doch irgendwoher...
Und siehe da... Ich mach das aktuell mit deiner opnsense-logstash-config (https://github.com/fabianfrz/opnsense-logstash-config) :D


Da ich leider jedoch relativ wenig Ahnung von der ganzen Materie habe wegen dem Filtern etc.. Hast mir da vllt einen Tipp? :P


Meine aktuelle config sieht (bis auf den Port) wie dein Original aus.
Code: [Select]
input {
tcp {
port => 5140
type => syslog
}
udp {
port => 5140
type => syslog
}
}


filter {
if [type] == "syslog" {
grok {
match => { "message" => "<%{POSINT:syslog_pri}>%{SYSLOGTIMESTAMP:syslog_timestamp} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
}
mutate { strip => ["syslog_message"] }
if [syslog_program] == "devd" {
if "!system=CAM" in [syslog_message] {
grok {
match => {"syslog_message" => "Processing event %{QUOTEDSTRING:data}"}
}
kv {source => "data"}
mutate { remove_field => 'data' }
}
drop { }
}
if [syslog_program] == "filterlog" {
opnsensefilter { field_name => "syslog_message" }
geoip { source => "source" }
if [geoip.ip] {
geoip {
source => "source"
target => "destination"
}
}
if ![geoip.ip] {
geoip { source => "destination" }
}
}
if [syslog_program] == "suricata" {
geoip { source => "src_ip" }
if [geoip.ip] {
geoip {
source => "src_ip"
target => "dest_ip"
}
}
if ![geoip.ip] {
geoip { source => "dest_ip" }
}
}
if [syslog_program] == "opnsense" {
if "for" in [syslog_message] and "from" in [syslog_message] {
mutate { add_field => {'os_type' => 'auth'} }
if "from:" in [syslog_message] {
grok {
match => {
"syslog_message" => "%{DATA:scriptname}: %{DATA:login_status} for user '%{USERNAME:username}' from: %{DATA:ip}"
}
}
}
else {
grok {
match => {
"syslog_message" => "%{DATA:scriptname}: %{DATA:login_status} for '%{USERNAME:username}' from %{DATA:ip}"
}
}
}
}
}
if [syslog_program] == "configd.py" {
if "message" in ["syslog_message"] {
grok {
match => {
"syslog_message" => "message %{UUID:uuid} \[%{DATA:action_name}\] returned %{WORD:status_word}.*"
}
}
}
if [syslog_message] =~ "^\[.+?\]" {
grok {
match => {"syslog_message" => "\[%{UUID:uuid}\] %{GREEDYDATA:configd_message}"}
}
}
if [syslog_message] =~ "^\S+* generated \S+$" {
grok {
match => {"syslog_message" => "^%{NOTSPACE:component_name} generated %{NOTSPACE:file_name}$"}
}
}
#mutate { remove_field => 'syslog_message' }
}
if [syslog_program] == "/usr/sbin/cron" {
grok {
match => {"syslog_message" => "\(%{USER:user}\) CMD %{GREEDYDATA:cron_message}"}
}
mutate { remove_field => 'syslog_message' }
}
if [syslog_program] in ["ospfd", "ospf6d"] {
if ":" in [syslog_message] {
grok {
match => {"syslog_message" => "%{DATA:component}: %{GREEDYDATA:sub_message}"}
}
}
if ":" in [sub_message] and "# Areas" not in [sub_message] {
grok {
match => {"sub_message" => "%{DATA:subcomponent}: %{GREEDYDATA:msg}"}
}
mutate { remove_field => "sub_message" }
mutate { rename => {"msg" => "sub_message"} }
}
if [syslog_message] =~ /^\S+\(\S+\).*/ {
grok {
match => {"syslog_message" => "%{NOTSPACE:component}\(%{NOTSPACE:function_name}\) %{GREEDYDATA:sub_message}"}
}
}
if [component] == "SPF" {
grok {
match => {"sub_message" => "Scheduled in %{NUMBER:scheduled} msec"}
}
}
if [component] == "SPF processing" {
grok {
match => {"sub_message" => "# Areas: %{NUMBER:number_areas}, SPF runtime: %{NUMBER:runtime_sec} sec %{NUMBER:runtime_usec} usec, Reason: %{GREEDYDATA:reason}"}
}
}

}
#"SPF processing: # Areas: 1, SPF runtime: 0 sec 0 usec, Reason: R+, R-"
#"OSPF6d (Quagga-1.2.1 ospf6d-0.9.7r) starts: vty@2606"
if [syslog_program] == "zebra" {
#"client 18 says hello and bids fair to announce only ospf6 routes"
}
}
}


output {
#stdout { codec => rubydebug }
elasticsearch {
hosts =>  "http://localhost:9200"
index => "logstash-opnsense-syslog-%{+YYYY.MM.dd}"
}
}
Title: Re: ELK Stack / Logging / Rollup Job
Post by: fabian on May 21, 2019, 05:15:40 pm
Einfach in der Filtersektion das hier machen:

https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html#plugins-filters-mutate-remove_field (https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html#plugins-filters-mutate-remove_field)
PS: In dem Fall funktioniert das Foto ;)
Title: Re: ELK Stack / Logging / Rollup Job
Post by: superwinni2 on June 03, 2019, 08:54:30 am
Habe vor einiger Zeit die config mal wie folgt angepasst:
Kannst du bitte mal drüber schauen ob es so richtig ist?


Code: [Select]
input {
  tcp {
    port => 5140
    type => syslog
  }


  udp {
    port => 5140
    type => syslog
  }
}


filter {
  if [type] == "syslog" {
    grok {
      match => { "message" => "<%{POSINT:syslog_pri}>%{SYSLOGTIMESTAMP:syslog_timestamp} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
    }


    mutate { strip => ["syslog_message"] }


    if [syslog_program] == "devd" {
      if "!system=CAM" in [syslog_message] {
        grok {
          match => {"syslog_message" => "Processing event %{QUOTEDSTRING:data}"}
        }


        kv {source => "data"}


        mutate { remove_field => 'data' }
      }


      drop { }
    }


    if [syslog_program] == "filterlog" {
      opnsensefilter { field_name => "syslog_message" }


      geoip { source => "source" }


      if [geoip.ip] {
        geoip {
          source => "source"
          target => "destination"
        }
      }


      if ![geoip.ip] {
        geoip { source => "destination" }
      }
    }


    if [syslog_program] == "suricata" {
      geoip { source => "src_ip" }


      if [geoip.ip] {
        geoip {
          source => "src_ip"
          target => "dest_ip"
        }
      }


      if ![geoip.ip] {
        geoip { source => "dest_ip" }
      }
    }


    if [syslog_program] == "opnsense" {
      if "for" in [syslog_message] and "from" in [syslog_message] {
        mutate { add_field => {'os_type' => 'auth'} }


        if "from:" in [syslog_message] {
          grok {
            match => {
              "syslog_message" => "%{DATA:scriptname}: %{DATA:login_status} for user '%{USERNAME:username}' from: %{DATA:ip}"
            }
          }
        } else {
          grok {
            match => {
              "syslog_message" => "%{DATA:scriptname}: %{DATA:login_status} for '%{USERNAME:username}' from %{DATA:ip}"
            }
          }
        }
      }
    }


    if [syslog_program] == "configd.py" {
      if "message" in ["syslog_message"] {
        grok {
          match => {
            "syslog_message" => "message %{UUID:uuid} \[%{DATA:action_name}\] returned %{WORD:status_word}.*"
          }
        }
      }


      if [syslog_message] =~ "^\[.+?\]" {
        grok {
          match => {"syslog_message" => "\[%{UUID:uuid}\] %{GREEDYDATA:configd_message}"}
        }
      }


      if [syslog_message] =~ "^\S+* generated \S+$" {
        grok {
          match => {"syslog_message" => "^%{NOTSPACE:component_name} generated %{NOTSPACE:file_name}$"}
        }
      }


      #mutate { remove_field => 'syslog_message' }
    }


    if [syslog_program] == "/usr/sbin/cron" {
      grok {
        match => {"syslog_message" => "\(%{USER:user}\) CMD %{GREEDYDATA:cron_message}"}
      }


      mutate { remove_field => 'syslog_message' }
    }


    if [syslog_program] in ["ospfd", "ospf6d"] {
      if ":" in [syslog_message] {
        grok {
          match => {"syslog_message" => "%{DATA:component}: %{GREEDYDATA:sub_message}"}
        }
      }


      if ":" in [sub_message] and "# Areas" not in [sub_message] {
        grok {
          match => {"sub_message" => "%{DATA:subcomponent}: %{GREEDYDATA:msg}"}
        }


        mutate { remove_field => "sub_message" }


        mutate { rename => {"msg" => "sub_message"} }
      }


      if [syslog_message] =~ /^\S+\(\S+\).*/ {
        grok {
          match => {"syslog_message" => "%{NOTSPACE:component}\(%{NOTSPACE:function_name}\) %{GREEDYDATA:sub_message}"}
        }
      }


      if [component] == "SPF" {
        grok {
          match => {"sub_message" => "Scheduled in %{NUMBER:scheduled} msec"}
        }
      }


      if [component] == "SPF processing" {
        grok {
          match => {"sub_message" => "# Areas: %{NUMBER:number_areas}, SPF runtime: %{NUMBER:runtime_sec} sec %{NUMBER:runtime_usec} usec, Reason: %{GREEDYDATA:reason}"}
        }
      }
    }


    #"SPF processing: # Areas: 1, SPF runtime: 0 sec 0 usec, Reason: R+, R-"
    #"OSPF6d (Quagga-1.2.1 ospf6d-0.9.7r) starts: vty@2606"
    if [syslog_program] == "zebra" {
      #"client 18 says hello and bids fair to announce only ospf6 routes"
    }
  }




  #Hier werden alle Felder/Keys eingegeben, welche nicht an Elasticsearch übermittelt werden sollen.
  mutate {
    remove_field => [
                      "ack_number",
                      "aid",
                      "ecn",
                      "direction_of_traffic",
                      "flags",
                      "[geoip][continent_code]",
                      "[geoip][country_code2]",
                      "[geoip][country_code3]",
                      "[geoip][ip]",
                      "[geoip][latitude]",
                      "[geoip][longitude]",
                      "[geoip][region_name]",
                      "[geoip][region_code]",
                      "[geoip][postal_code]",
                      "[geoip][timezone]",
                      "hop_limit",
                      "ip_version",
                      "length",
                      "message",
                      "myoffset",
                      "options",
                      "protocol_id",
                      "reason",
                      "sequence_number",
                      "subrule",
                      "syslog_message",
                      "syslog_pri",
                      "syslog_program",
                      "syslog_timestamp",
                      "tags",
                      "tcp_flags",
                      "tos",
                      "type",
                      "urgent_pointer",
                      "window"
                    ]
  }
}


output {
  #stdout { codec => rubydebug }
  elasticsearch {
    hosts =>  "http://localhost:9200"
    index => "logstash-opnsense-syslog-%{+YYYY.MM.dd}"
  }
}


Oder gibt es noch irgendwelche Verbesserungsvorschläge?


Danke und Gruß
Title: Re: ELK Stack / Logging / Rollup Job
Post by: fabian on June 03, 2019, 07:12:25 pm
ob's richtig ist, musst du für dich selbst entscheiden ;)

Ich halte ip_version zum Beispiel für wichtig.
Title: Re: ELK Stack / Logging / Rollup Job
Post by: superwinni2 on June 05, 2019, 09:16:02 am
Da wir intern sowie extern so oder so nur IPv4 haben brauche ich das meiner Meinung nicht...
Hatte zumidnest noch nie den Fall dass ich etwas suchen mutte das IPv4 oder IPv6 gefiltert war...

Mit richtig meinte ich eigentlich ob es vielleicht doch eine "schönere" Variante gibt wie das was ich hier habe...
Habe es zwar hinbekommen, aber verstanden noch lange nicht  ::) :P
Title: Re: ELK Stack / Logging / Rollup Job
Post by: mimugmail on June 05, 2019, 10:02:04 am
Das Problem ist nicht, dass du es selbst nicht verstehst, sondern du dir selbst keine Anforderungen stellst, was genau du mit dem ELK erreichen willst.

Wenn du es einfach nur haben willst weil es toll ist, reicht das :)
Title: Re: ELK Stack / Logging / Rollup Job
Post by: superwinni2 on June 05, 2019, 10:29:44 am
Anforderung war erstmal:
Loggiong der Firewall um nachzuschauen wann etwas blockiert wird um Fehler zu suchen.
Zudem dies möglichst Platzsparend zu realisieren.


Angefangen habe ich mit ELK 6.6.1.
Dort hat das Logging dann pro Tag ~3-4 GB gefressen.
Dann auf 7.0.1 gewechselt. Dann waren es ~2-3 GB.
Dann die config geändert und nun noch bei 0.3 - 0.6 GB pro Tag.


Ich habe zwar etwas in die Config geschrieben und es funktioniert. Allerdings 100% verstanden habe ich es nicht. Bin vielleicht auch noch etwas zu grün hinter den Ohren wenn es um ELK geht... Kannte es davor gar nicht.


Und da früher oder später auch noch der NetFlow sowie diverse andere Logs vielleicht damit geloggt werden sollen, wäre das "Verstehen des Systems" sicher nicht von nachteil :)


Grüßle
Title: Re: ELK Stack / Logging / Rollup Job
Post by: JeGr on June 05, 2019, 12:43:52 pm
> Loggiong der Firewall um nachzuschauen wann etwas blockiert wird um Fehler zu suchen.

Dann ist IP Version relevant. Ob du v6 nutzt oder nicht ist egal, aber es gibt genug Fälle wo v6 trotzdem automatisch gesprochen wird. Und gerade wenn es nicht sein sollte, will man das wissen.
Title: Re: ELK Stack / Logging / Rollup Job
Post by: superwinni2 on June 05, 2019, 10:33:47 pm
Guter Gedanke  :)  Danke schonmals

Hätte dazu aber noch ne Rückfrage...
Sieht / Erkennt man dies dann nicht anhand von der IP?
Title: Re: ELK Stack / Logging / Rollup Job
Post by: ruffy91 on June 06, 2019, 07:36:47 am
Ja. Und damit du dies in einem query brauchen kannst (z.B. Kuchendiagramm das zeigt wieviel % der Pakete IPv6 sind) führst du dann ein Feld ein wo du drin speicherst ob es v4 oder v6 ist -> besser gleich von Anfang an speichern.

Gesendet von meinem MI 9 mit Tapatalk