Skip to content
Hunter Wu edited this page Jan 16, 2019 · 1 revision

apache log & ruby filter

input {
  file {
    path => "/var/log/apache2/access.log"
    start_position => "beginning"
  }
}

filter {
  if [path] =~ "access" {
    mutate { replace => { "type" => "apache_access" } }
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
    ruby {
      code => "require 'digest/md5';
        event['computed_id'] = Digest::MD5.hexdigest(event['timestamp'] + ':' + event['clientip'] + ':' + event['request'])"
    }
  }
  date {
    match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
  }
}

output {
  elasticsearch {
    host => localhost
    document_id => "%{computed_id}"
  }
  stdout { codec => rubydebug }
}

S3 Input & output

  • Output to S3
input {
    file {
        type => "my_log_throught_s3"
        path => ["/path/to/log"]
        codec => "json"
    }
}
output {
  if [type] == "my_log_throught_s3" {
      s3 {
          access_key_id => ""
          secret_access_key => ""
          bucket => "your_bucket"
          prefix => "path/to/prefix/"
          codec => "json_lines"
          temporary_directory => "/tmp/path/to/tmp_dir"
          time_file => 1
      }
   }
}
  • Input from S3
input {
    s3 {
        access_key_id => ""
        secret_access_key => ""
        bucket => "your_bucket"
        prefix => "path/to/prefix/"
        temporary_directory => "/tmp/path/to/tmp_dir"
        codec => "json"
        delete => true
    }
}
filter {
  if [type] == "my_log_throught_s3" {
    mutate {
      add_field => {
        "[@metadata][_id]" => "%{DocumentID}"
      }
      remove_field => ["path", "DocumentID"]
    }
    date {
      match => [ "Timestamp" , "ISO8601" ]
      timezone => "UTC"
      remove_field => ["Timestamp"]
    }
  }
}
output {
  if [type] == "my_log_throught_s3" {
    elasticsearch {
      hosts => [ "127.0.0.1:9200" ]
      document_id => "%{[@metadata][_id]}"
      index => "my_log-%{+YYYY.MM.dd}"
    }
  }
}
Clone this wiki locally