From 618f727e34533786f8411331376c20a777234228 Mon Sep 17 00:00:00 2001 From: David Feltell Date: Tue, 23 Apr 2019 17:12:27 +0100 Subject: [PATCH] [RD-34614] Support multiline logs from multiple log groups * The logstash multiline codec plugin cannot, by default, differentiate between log lines from different sources. This means child lines could be grouped under the wrong parent line if multiple streams (CloudWatch log groups) are interleaved. * However, the multiline plugin provides a barely-documented pseudo-codec wrapper, which augments a given underlying codec with an "identity map". * So use this wrapper, passing the CloudWatch log group as the `identity`. --- lib/logstash/inputs/cloudwatch_logs.rb | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/logstash/inputs/cloudwatch_logs.rb b/lib/logstash/inputs/cloudwatch_logs.rb index 38d3cc6..5421c1f 100644 --- a/lib/logstash/inputs/cloudwatch_logs.rb +++ b/lib/logstash/inputs/cloudwatch_logs.rb @@ -3,6 +3,7 @@ require "logstash/namespace" require "logstash/plugin_mixins/aws_config" require "logstash/timestamp" +require "logstash/codecs/identity_map_codec" require "time" require "stud/interval" require "aws-sdk" @@ -93,7 +94,9 @@ def register @logger.info("No sincedb_path set, generating one based on the log_group setting", :sincedb_path => @sincedb_path, :log_group => @log_group) end - + # Wrap codec in multiline plugin's pseudo-codec, identity_map_codec, to support multiline + # streams from multiple sources (i.e. log groups). + @codec = LogStash::Codecs::IdentityMapCodec.new(@codec) end #def register public @@ -208,7 +211,7 @@ def process_group(group) private def process_log(log, group) - @codec.decode(log.message.to_str) do |event| + @codec.decode(log.message.to_str, identity=group) do |event| event.set("@timestamp", parse_time(log.timestamp)) event.set("[cloudwatch_logs][ingestion_time]", parse_time(log.ingestion_time)) event.set("[cloudwatch_logs][log_group]", group)