Skip to content

Commit

Permalink
Update specs to match new code
Browse files Browse the repository at this point in the history
  • Loading branch information
eherot committed Nov 5, 2023
1 parent e75a421 commit 9d85332
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 27 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,4 @@ It is more important to the community that you are able to contribute.

For more information about contributing, see the [CONTRIBUTING](https://github.com/elastic/logstash/blob/master/CONTRIBUTING.md) file.

Arbitrary change to trigger a build #2
Arbitrary change to trigger a build
4 changes: 4 additions & 0 deletions lib/logstash/inputs/s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ class LogStash::Inputs::S3 < LogStash::Inputs::Base
# unless otherwise configured.
config :region, :validate => :string, :default => 'us-east-1'

config :access_key_id, :validate => :string, :default => nil

config :secret_access_key, :validate => :string, :default => nil

# If specified, the prefix of filenames in the bucket must match (not a regexp)
config :prefix, :validate => :string, :default => nil

Expand Down
2 changes: 2 additions & 0 deletions lib/logstash/inputs/s3/processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ def handle(remote_file)
return if !validator.process?(remote_file)
@logger.debug("Remote file passed validation. Downloading data.", :remote_file => remote_file)

puts "remote_file.content_length: #{remote_file.content_length} which is > 0. Downloading!"

remote_file.download!

@logger.debug("File downloaded. Emitting events.", :remote_file => remote_file)
Expand Down
2 changes: 1 addition & 1 deletion lib/logstash/inputs/s3/remote_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def compressed_gzip?
# Usually I would use the content_type to retrieve this information.
# but this require another call to S3 for each download which isn't really optimal.
# So we will use the filename to do a best guess at the content type.
::File.extname(remote_object.key).downcase == @gzip_pattern
::File.extname(remote_object.key).downcase =~ Regexp.new(@gzip_pattern)
end

def inspect
Expand Down
5 changes: 3 additions & 2 deletions lib/logstash/inputs/s3/sincedb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,13 @@ def reseed(remote_file)
private
attr_reader :options

def start_bookkeeping
def start_bookkeeping
raise 'never let this run'
@stopped.make_false

Thread.new do
LogStash::Util.set_thread_name("S3 input, sincedb periodic fsync")
Stud.interval(1) { periodic_sync }
Stud.interval(1) { puts 'Running bookkeeper'; periodic_sync }
end
end

Expand Down
16 changes: 14 additions & 2 deletions spec/inputs/s3/event_processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,29 @@
require "thread"

describe LogStash::Inputs::S3::EventProcessor do
let(:logstash_inputs_s3) { double("logstash-inputs-s3") }
let(:include_object_properties) { true }
let(:logger) { double("Logger").as_null_object }
let(:metadata) { { "s3" => { "bucket_name" => "bucket-land" } } }
let(:encoded_line) { LogStash::Json.dump({ "message" => "Hello World" }) }
let(:codec) { LogStash::Codecs::JSON.new }
let(:queue) { Queue.new }

let(:remote_file_data) { { "bucket_name" => "bucket-land" } }

before do
described_class.new(codec, queue).process(encoded_line, metadata)
described_class.new(logstash_inputs_s3, codec, queue, include_object_properties, logger).process(encoded_line, metadata, remote_file_data)
end

subject { queue.pop }

it "queue should have things in it" do
expect(queue).not_to be_empty
end

it "Event object should not be nil" do
expect(subject).not_to be_nil
end

it "uses the codec and insert the event to the queue" do
expect(subject["message"]).to eq("Hello World")
end
Expand Down
13 changes: 10 additions & 3 deletions spec/inputs/s3/poller_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,23 @@
require "rspec/wait"

describe LogStash::Inputs::S3::Poller do
let(:sincedb) { double("sincedb") }
let(:sincedb) { double("sincedb").as_null_object }
let(:logger) { double("logger").as_null_object }
let(:bucket_name) { "my-stuff" }
let(:bucket) { Aws::S3::Bucket.new(:stub_responses => true, :name => bucket_name) }
let(:remote_objects) { double("remote_objects") }
let(:objects) { [OpenStruct.new({:key => "myobject", :last_modified => Time.now-60, :body => "Nooo" })] }

before :each do
allow(bucket).to receive(:objects).with(anything).and_return(objects)
allow(bucket).to receive(:objects).with(anything).and_return(remote_objects)
allow(remote_objects).to receive(:limit).with(anything) do |num|
expect(num).to be_a(Integer)
expect(num).to be > 0
objects
end
end

subject { described_class.new(bucket) }
subject { described_class.new(bucket, sincedb, logger) }

it "lists the files from the remote host" do
retrieved_objects = []
Expand Down
15 changes: 12 additions & 3 deletions spec/inputs/s3/post_processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
require "stud/temporary"

describe LogStash::Inputs::S3::PostProcessor do
let(:remote_file) { LogStash::Inputs::S3::RemoteFile.new(s3_object) }
let(:logger) { double("logger").as_null_object }
let(:gzip_pattern) { "*.gz" }
let(:remote_file) { LogStash::Inputs::S3::RemoteFile.new(s3_object, logger, gzip_pattern) }
let(:s3_object) { double("s3_object",
:key => "hola",
:bucket_name => "mon-bucket",
Expand All @@ -16,15 +18,22 @@
describe LogStash::Inputs::S3::PostProcessor::UpdateSinceDB do
let(:ignore_older) { 3600 }
let(:sincedb_path) { Stud::Temporary.file.path }
let(:sincedb) { LogStash::Inputs::S3::SinceDB.new(sincedb_path, ignore_older) }
let(:logger) { double("logger").as_null_object }

before do
# Avoid starting the bookkeeping thread since it will keep running after the test
allow_any_instance_of(LogStash::Inputs::S3::SinceDB).to receive(:start_bookkeeping)
end

let(:sincedb) { LogStash::Inputs::S3::SinceDB.new(sincedb_path, ignore_older, logger) }

subject { described_class.new(sincedb) }

after :each do
File.delete(sincedb_path)
end

it "make the remote file as completed" do
it "mark the remote file as completed" do
subject.process(remote_file)
expect(sincedb.processed?(remote_file)).to be_truthy
end
Expand Down
21 changes: 15 additions & 6 deletions spec/inputs/s3/proccessing_policy_validator_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
require "stud/temporary"

describe LogStash::Inputs::S3::ProcessingPolicyValidator do
let(:remote_file) { RemoteFile.new(s3_object) }
let(:logger) { double("logger").as_null_object }
let(:gzip_pattern) { "*.gz" }
let(:remote_file) { LogStash::Inputs::S3::RemoteFile.new(s3_object, logger, gzip_pattern) }
let(:s3_object) { double("s3_object", :key => "hola", :content_length => 20, :last_modified => Time.now-60) }

let(:validator_1) { LogStash::Inputs::S3::ProcessingPolicyValidator::SkipEmptyFile }
Expand All @@ -15,16 +17,16 @@
subject { described_class }

it "accepts multiples validator" do
expect(subject.new(validator_1, validator_2).count).to eq(2)
expect(subject.new(logger, validator_1, validator_2).count).to eq(2)
end

it "accepts one validator" do
expect(subject.new(validator_1).count).to eq(1)
expect(subject.new(logger, validator_1).count).to eq(1)
end
end

context "#add_policy" do
subject { described_class.new(validator_1) }
subject { described_class.new(logger, validator_1) }

it "allows to add more validators" do
expect(subject.count).to eq(1)
Expand All @@ -43,7 +45,7 @@
end

context "#process?" do
subject { described_class.new(validator_1, validator_2) }
subject { described_class.new(logger, validator_1, validator_2) }

it "execute the validator in declarations order" do
expect(validator_1).to receive(:process?).ordered.and_return(true)
Expand Down Expand Up @@ -133,7 +135,14 @@
let(:older_than) { 3600 }
let(:s3_object) { double("remote_file", :etag => "1234", :bucket_name => "mon-bucket", :key => "hola", :content_length => 100, :last_modified => Time.now) }
let(:sincedb_path) { Stud::Temporary.file.path }
let(:sincedb) { LogStash::Inputs::S3::SinceDB.new(sincedb_path, older_than) }
let(:logger) { double("logger").as_null_object }

before do
# Avoid starting the bookkeeping thread since it will keep running after the test
allow_any_instance_of(LogStash::Inputs::S3::SinceDB).to receive(:start_bookkeeping)
end

let(:sincedb) { LogStash::Inputs::S3::SinceDB.new(sincedb_path, older_than, logger) }

subject { described_class.new(sincedb) }

Expand Down
12 changes: 8 additions & 4 deletions spec/inputs/s3/processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,22 @@
let(:post_processors) { [post_processor_1, post_processor_2] }

let(:validator) { LogStash::Inputs::S3::ProcessingPolicyValidator.new(LogStash::Inputs::S3::ProcessingPolicyValidator::SkipEmptyFile) }
let(:remote_file) { LogStash::Inputs::S3::RemoteFile.new(s3_object) }
let(:logger) { double("Logger").as_null_object }
let(:gzip_pattern) { "*.gz" }
let(:remote_file) { LogStash::Inputs::S3::RemoteFile.new(s3_object, logger, gzip_pattern) }
let(:s3_object) { double("s3_object",
:data => { "bucket_name" => "mon-bucket" },
:key => "hola",
:bucket_name => "mon-bucket",
:content_length => 20,
:last_modified => Time.now-60) }

subject { described_class.new(validator, event_processor, post_processors) }
subject { described_class.new(validator, event_processor, logger, post_processors) }

context "When handling remote file" do
context "when the file is not valid to process" do
context "when the file is not valid to process (because content_length = 0)" do
let(:s3_object) { double("s3_object",
:data => { "bucket_name" => "mon-bucket" },
:key => "hola",
:content_length => 0,
:last_modified => Time.now-60) }
Expand All @@ -44,7 +48,7 @@

it "send the file content to the event processor" do
subject.handle(remote_file)
expect(event_processor).to have_received(:process).with(content, { "s3" => hash_including(metadata["s3"])})
expect(event_processor).to have_received(:process).with(content, { "s3" => hash_including(metadata["s3"])}, s3_object.data)
end

it "sends the file to all post processors" do
Expand Down
11 changes: 7 additions & 4 deletions spec/inputs/s3/remote_file_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
require "logstash/inputs/s3/remote_file"

describe LogStash::Inputs::S3::RemoteFile do
subject { described_class.new(s3_object) }
let(:logger) { double("logger").as_null_object }
let(:gzip_pattern) { "\.gz(ip)?$" }

subject { described_class.new(s3_object, logger, gzip_pattern) }

context "#compressed_gzip?" do
context "when `content_type` is `application/gzip`" do
context "when remote object key ends in .gz" do
let(:s3_object) { double("s3_object",
:content_type => "application/gzip",
:key => "hola",
:key => "hola.gz",
:content_length => 20,
:last_modified => Time.now-60) }

Expand All @@ -17,7 +20,7 @@
end
end

context "when `content_type` is not `application/gzip`" do
context "when remote object key ends in something else" do
let(:s3_object) { double("s3_object",
:content_type => "text/plain",
:key => "hola",
Expand Down
9 changes: 8 additions & 1 deletion spec/integration/retrieve_logs_spec.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# encoding: utf-8
require "logstash/inputs/s3"
require "logstash/inputs/s3/sincedb"
require_relative "../support/matcher_helpers"
require_relative "../support/s3_input_test_helper"
require "stud/temporary"
Expand All @@ -16,6 +17,12 @@

describe "Retrieve logs from S3", :tags => :integration do
let(:queue) { Queue.new }
let(:stub_since_db) { double("since_db") }

before do
# Stub this out so that we can avoid starting the bookkeeper thread which doesn't die
allow(LogStash::Inputs::S3::SinceDB).to receive(:new).with(anything).and_return(stub_since_db)
end

let(:plugin) { LogStash::Inputs::S3.new(plugin_config) }

Expand All @@ -35,7 +42,7 @@
let(:region) { REGION }

let(:plugin_config) do
super.merge({
super().merge({
"access_key_id" => access_key_id,
"secret_access_key" => secret_access_key,
"region" => region,
Expand Down

0 comments on commit 9d85332

Please sign in to comment.