International Forums > German - Deutsch
ELK Stack / Logging / Rollup Job
superwinni2:
Hallo zusammen
an die Leute die sich mit ELK auskennen oder es sogar auch zum Loggen der FWLog benutzen:
Bin gerade dabei eine kleien Anleitung zu schreiben um die FWLogs mit ELK zu speichern und zu durchsuchen...
Kann mir jemand helfen beim einstellen des Rollup Jobs damit ich nur noch 10 Felder anstatt der kompletten 38 Felder speichere?
Brauche bei mir aktuell ~3 GB/Tag zum speichern der 38 Felder... Hoffe dies kann ich so reduzieren.
Bekomme es selbst leider nicht hin...
Danke und Gruß
fabian:
ich würde dir raten, unwichtige felder schon im logstash raus zu schmeißen, dann hast du die arbeit am schluss nicht.
superwinni2:
Hallo Fabian
habe doch gerade gedacht... Das Profilbild kennt man doch irgendwoher...
Und siehe da... Ich mach das aktuell mit deiner opnsense-logstash-config :D
Da ich leider jedoch relativ wenig Ahnung von der ganzen Materie habe wegen dem Filtern etc.. Hast mir da vllt einen Tipp? :P
Meine aktuelle config sieht (bis auf den Port) wie dein Original aus.
--- Code: ---input {
tcp {
port => 5140
type => syslog
}
udp {
port => 5140
type => syslog
}
}
filter {
if [type] == "syslog" {
grok {
match => { "message" => "<%{POSINT:syslog_pri}>%{SYSLOGTIMESTAMP:syslog_timestamp} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
}
mutate { strip => ["syslog_message"] }
if [syslog_program] == "devd" {
if "!system=CAM" in [syslog_message] {
grok {
match => {"syslog_message" => "Processing event %{QUOTEDSTRING:data}"}
}
kv {source => "data"}
mutate { remove_field => 'data' }
}
drop { }
}
if [syslog_program] == "filterlog" {
opnsensefilter { field_name => "syslog_message" }
geoip { source => "source" }
if [geoip.ip] {
geoip {
source => "source"
target => "destination"
}
}
if ![geoip.ip] {
geoip { source => "destination" }
}
}
if [syslog_program] == "suricata" {
geoip { source => "src_ip" }
if [geoip.ip] {
geoip {
source => "src_ip"
target => "dest_ip"
}
}
if ![geoip.ip] {
geoip { source => "dest_ip" }
}
}
if [syslog_program] == "opnsense" {
if "for" in [syslog_message] and "from" in [syslog_message] {
mutate { add_field => {'os_type' => 'auth'} }
if "from:" in [syslog_message] {
grok {
match => {
"syslog_message" => "%{DATA:scriptname}: %{DATA:login_status} for user '%{USERNAME:username}' from: %{DATA:ip}"
}
}
}
else {
grok {
match => {
"syslog_message" => "%{DATA:scriptname}: %{DATA:login_status} for '%{USERNAME:username}' from %{DATA:ip}"
}
}
}
}
}
if [syslog_program] == "configd.py" {
if "message" in ["syslog_message"] {
grok {
match => {
"syslog_message" => "message %{UUID:uuid} \[%{DATA:action_name}\] returned %{WORD:status_word}.*"
}
}
}
if [syslog_message] =~ "^\[.+?\]" {
grok {
match => {"syslog_message" => "\[%{UUID:uuid}\] %{GREEDYDATA:configd_message}"}
}
}
if [syslog_message] =~ "^\S+* generated \S+$" {
grok {
match => {"syslog_message" => "^%{NOTSPACE:component_name} generated %{NOTSPACE:file_name}$"}
}
}
#mutate { remove_field => 'syslog_message' }
}
if [syslog_program] == "/usr/sbin/cron" {
grok {
match => {"syslog_message" => "\(%{USER:user}\) CMD %{GREEDYDATA:cron_message}"}
}
mutate { remove_field => 'syslog_message' }
}
if [syslog_program] in ["ospfd", "ospf6d"] {
if ":" in [syslog_message] {
grok {
match => {"syslog_message" => "%{DATA:component}: %{GREEDYDATA:sub_message}"}
}
}
if ":" in [sub_message] and "# Areas" not in [sub_message] {
grok {
match => {"sub_message" => "%{DATA:subcomponent}: %{GREEDYDATA:msg}"}
}
mutate { remove_field => "sub_message" }
mutate { rename => {"msg" => "sub_message"} }
}
if [syslog_message] =~ /^\S+\(\S+\).*/ {
grok {
match => {"syslog_message" => "%{NOTSPACE:component}\(%{NOTSPACE:function_name}\) %{GREEDYDATA:sub_message}"}
}
}
if [component] == "SPF" {
grok {
match => {"sub_message" => "Scheduled in %{NUMBER:scheduled} msec"}
}
}
if [component] == "SPF processing" {
grok {
match => {"sub_message" => "# Areas: %{NUMBER:number_areas}, SPF runtime: %{NUMBER:runtime_sec} sec %{NUMBER:runtime_usec} usec, Reason: %{GREEDYDATA:reason}"}
}
}
}
#"SPF processing: # Areas: 1, SPF runtime: 0 sec 0 usec, Reason: R+, R-"
#"OSPF6d (Quagga-1.2.1 ospf6d-0.9.7r) starts: vty@2606"
if [syslog_program] == "zebra" {
#"client 18 says hello and bids fair to announce only ospf6 routes"
}
}
}
output {
#stdout { codec => rubydebug }
elasticsearch {
hosts => "http://localhost:9200"
index => "logstash-opnsense-syslog-%{+YYYY.MM.dd}"
}
}
--- End code ---
fabian:
Einfach in der Filtersektion das hier machen:
https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html#plugins-filters-mutate-remove_field
PS: In dem Fall funktioniert das Foto ;)
superwinni2:
Habe vor einiger Zeit die config mal wie folgt angepasst:
Kannst du bitte mal drüber schauen ob es so richtig ist?
--- Code: ---input {
tcp {
port => 5140
type => syslog
}
udp {
port => 5140
type => syslog
}
}
filter {
if [type] == "syslog" {
grok {
match => { "message" => "<%{POSINT:syslog_pri}>%{SYSLOGTIMESTAMP:syslog_timestamp} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
}
mutate { strip => ["syslog_message"] }
if [syslog_program] == "devd" {
if "!system=CAM" in [syslog_message] {
grok {
match => {"syslog_message" => "Processing event %{QUOTEDSTRING:data}"}
}
kv {source => "data"}
mutate { remove_field => 'data' }
}
drop { }
}
if [syslog_program] == "filterlog" {
opnsensefilter { field_name => "syslog_message" }
geoip { source => "source" }
if [geoip.ip] {
geoip {
source => "source"
target => "destination"
}
}
if ![geoip.ip] {
geoip { source => "destination" }
}
}
if [syslog_program] == "suricata" {
geoip { source => "src_ip" }
if [geoip.ip] {
geoip {
source => "src_ip"
target => "dest_ip"
}
}
if ![geoip.ip] {
geoip { source => "dest_ip" }
}
}
if [syslog_program] == "opnsense" {
if "for" in [syslog_message] and "from" in [syslog_message] {
mutate { add_field => {'os_type' => 'auth'} }
if "from:" in [syslog_message] {
grok {
match => {
"syslog_message" => "%{DATA:scriptname}: %{DATA:login_status} for user '%{USERNAME:username}' from: %{DATA:ip}"
}
}
} else {
grok {
match => {
"syslog_message" => "%{DATA:scriptname}: %{DATA:login_status} for '%{USERNAME:username}' from %{DATA:ip}"
}
}
}
}
}
if [syslog_program] == "configd.py" {
if "message" in ["syslog_message"] {
grok {
match => {
"syslog_message" => "message %{UUID:uuid} \[%{DATA:action_name}\] returned %{WORD:status_word}.*"
}
}
}
if [syslog_message] =~ "^\[.+?\]" {
grok {
match => {"syslog_message" => "\[%{UUID:uuid}\] %{GREEDYDATA:configd_message}"}
}
}
if [syslog_message] =~ "^\S+* generated \S+$" {
grok {
match => {"syslog_message" => "^%{NOTSPACE:component_name} generated %{NOTSPACE:file_name}$"}
}
}
#mutate { remove_field => 'syslog_message' }
}
if [syslog_program] == "/usr/sbin/cron" {
grok {
match => {"syslog_message" => "\(%{USER:user}\) CMD %{GREEDYDATA:cron_message}"}
}
mutate { remove_field => 'syslog_message' }
}
if [syslog_program] in ["ospfd", "ospf6d"] {
if ":" in [syslog_message] {
grok {
match => {"syslog_message" => "%{DATA:component}: %{GREEDYDATA:sub_message}"}
}
}
if ":" in [sub_message] and "# Areas" not in [sub_message] {
grok {
match => {"sub_message" => "%{DATA:subcomponent}: %{GREEDYDATA:msg}"}
}
mutate { remove_field => "sub_message" }
mutate { rename => {"msg" => "sub_message"} }
}
if [syslog_message] =~ /^\S+\(\S+\).*/ {
grok {
match => {"syslog_message" => "%{NOTSPACE:component}\(%{NOTSPACE:function_name}\) %{GREEDYDATA:sub_message}"}
}
}
if [component] == "SPF" {
grok {
match => {"sub_message" => "Scheduled in %{NUMBER:scheduled} msec"}
}
}
if [component] == "SPF processing" {
grok {
match => {"sub_message" => "# Areas: %{NUMBER:number_areas}, SPF runtime: %{NUMBER:runtime_sec} sec %{NUMBER:runtime_usec} usec, Reason: %{GREEDYDATA:reason}"}
}
}
}
#"SPF processing: # Areas: 1, SPF runtime: 0 sec 0 usec, Reason: R+, R-"
#"OSPF6d (Quagga-1.2.1 ospf6d-0.9.7r) starts: vty@2606"
if [syslog_program] == "zebra" {
#"client 18 says hello and bids fair to announce only ospf6 routes"
}
}
#Hier werden alle Felder/Keys eingegeben, welche nicht an Elasticsearch übermittelt werden sollen.
mutate {
remove_field => [
"ack_number",
"aid",
"ecn",
"direction_of_traffic",
"flags",
"[geoip][continent_code]",
"[geoip][country_code2]",
"[geoip][country_code3]",
"[geoip][ip]",
"[geoip][latitude]",
"[geoip][longitude]",
"[geoip][region_name]",
"[geoip][region_code]",
"[geoip][postal_code]",
"[geoip][timezone]",
"hop_limit",
"ip_version",
"length",
"message",
"myoffset",
"options",
"protocol_id",
"reason",
"sequence_number",
"subrule",
"syslog_message",
"syslog_pri",
"syslog_program",
"syslog_timestamp",
"tags",
"tcp_flags",
"tos",
"type",
"urgent_pointer",
"window"
]
}
}
output {
#stdout { codec => rubydebug }
elasticsearch {
hosts => "http://localhost:9200"
index => "logstash-opnsense-syslog-%{+YYYY.MM.dd}"
}
}
--- End code ---
Oder gibt es noch irgendwelche Verbesserungsvorschläge?
Danke und Gruß
Navigation
[0] Message Index
[#] Next page
Go to full version