Skip to content

Commit

Permalink
Merge pull request #29 from slimm609/add_debug
Browse files Browse the repository at this point in the history
Update to ruby 3.0+, add debug
  • Loading branch information
slk244 authored Aug 24, 2023
2 parents a084f81 + f8b42aa commit b922a31
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 26 deletions.
10 changes: 5 additions & 5 deletions fluent-plugin-vmware-log-intelligence.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ Gem::Specification.new do |s|
s.require_paths = ['lib']

s.add_dependency "fluentd", ">= 0.14.20"
s.add_dependency "http", ">= 0.9.8"
s.add_dependency "http", ">= 4.0"
s.add_dependency "myslog", "~> 0.0"
s.add_dependency "fluent-plugin-mysqlslowquery", ">= 0.0.9"
s.add_development_dependency "rake", ">= 0.9.2"
s.add_development_dependency "bundler", ">= 1.3.4"
s.add_development_dependency 'test-unit', '~> 3.1.0'
s.add_development_dependency 'webmock', '~> 3.4.0'
s.add_development_dependency "rake", ">= 13.0.0"
s.add_development_dependency "bundler", ">= 2.4.0"
s.add_development_dependency 'test-unit', '~> 3.5.0'
s.add_development_dependency 'webmock', '~> 3.10.0'
s.add_development_dependency 'fluent-plugin-detect-exceptions', '>= 0.0.12'
s.add_development_dependency 'fluent-plugin-concat', '>= 2.0.0'
s.add_development_dependency 'fluent-plugin-kubernetes_metadata_filter', '>= 2.0.0'
Expand Down
14 changes: 9 additions & 5 deletions lib/fluent/plugin/http_client.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2018 VMware, Inc.
# Copyright 2023 VMware, Inc.
# SPDX-License-Identifier: MIT

module Fluent::Plugin
Expand All @@ -20,6 +20,7 @@ def initialize(endpoint_url, verify_ssl,
:connect_timeout => open_timeout,
:read_timeout => read_timeout
}
@log.debug "VMware Log Intelligence - Timeout Options: connect timeout #{open_timeout}, read timeout #{read_timeout}"

@conn = HTTP.persistent(endpoint_url)
.headers(headers)
Expand All @@ -29,19 +30,21 @@ def initialize(endpoint_url, verify_ssl,
@last_429_time = nil
end

def check_quota
def quota_reached
@log.debug "VMware Log Intelligence - Checking Rate Limit Quota"
if @last_429_time
if (Time.new - @last_429_time) < 600
return false
return true
end

@last_429_time = nil
end
return true
return false
end

def post(data)
if !check_quota
if quota_reached
@log.debug "VMware Log Intelligence - Rate limit quota reached"
return
end

Expand All @@ -54,6 +57,7 @@ def post(data)
else
@last_429_time = nil
end
@log.debug "VMware Log Intelligence - Response code from VMware Log Intelligence: #{response.code}"

if @statuses.include? response.code.to_i
# Raise an exception so that fluent will retry based on the configurations.
Expand Down
22 changes: 15 additions & 7 deletions lib/fluent/plugin/out_vmware_log_intelligence.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (c) 2013 ablagoev
# Copyright 2018 VMware, Inc.
# Copyright 2023 VMware, Inc.
# SPDX-License-Identifier: MIT


Expand Down Expand Up @@ -41,7 +41,7 @@ def initialize
end

def validate_uri(uri_string)
unless uri_string =~ /^#{URI.regexp}$/
unless uri_string =~ /^#{URI::DEFAULT_PARSER.make_regexp}$/
fail Fluent::ConfigError, 'endpoint_url invalid'
end

Expand All @@ -56,6 +56,7 @@ def retrieve_headers(conf)
headers = {}
conf.elements.each do |element|
if @http_compress
@log.debug "VMware Log Intelligence Compression enabled"
set_gzip_header(element)
end
if element.name == 'headers'
Expand Down Expand Up @@ -102,7 +103,8 @@ def create_lint_event(record)
keys.push(key)
key.force_encoding("utf-8")

if value.is_a?(String)
if value.is_a?(String)
@log.debug "VMware Log Intelligence force encoding"
value.force_encoding("utf-8")
end
end
Expand Down Expand Up @@ -130,7 +132,7 @@ def create_lint_event(record)

def flatten_record(record, prefix=[])
ret = {}

@log.debug "VMware Log Intelligence flattening record"
case record
when Hash
record.each do |key, value|
Expand Down Expand Up @@ -170,17 +172,20 @@ def multi_workers_ready?

def start
super
@log.debug "Started VMware Log Intelligence Shipper.."
end

def shutdown
super
@log.debug "Shutting Down VMware Log Intelligence Shipper.."
begin
@http_client.close if @http_client
rescue
end
end

def write(chunk)
@log.debug "VMware Log Intelligence writing message"
is_rate_limited = (@rate_limit_msec != 0 and not @last_request_time.nil?)
if is_rate_limited and ((Time.now.to_f - @last_request_time) * 1000.0 < @rate_limit_msec)
@log.info('Dropped request due to rate limiting')
Expand All @@ -193,12 +198,15 @@ def write(chunk)
end

if @http_compress
@log.debug "VMware Log Intelligence sending compressed message"
gzip_body = Zlib::GzipWriter.new(StringIO.new)
gzip_body << data.to_json
gzip_body << Yajl.dump(data)
@last_request_time = Time.now.to_f
@http_client.post(gzip_body.close.string)
else
else
@log.debug "VMware Log Intelligence sending uncompressed message"
@last_request_time = Time.now.to_f
@http_client.post(JSON.dump(data))
@http_client.post(Yajl.dump(data))
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion test/helper.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (c) 2013 ablagoev
# Copyright 2018 VMware, Inc.
# Copyright 2023 VMware, Inc.
# SPDX-License-Identifier: MIT

require 'coveralls'
Expand Down
16 changes: 8 additions & 8 deletions test/plugin/test_http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ def stub_server_out_of_quota(url=DEFAULT_URL)
stub_request(:post, url).to_return(:status => [429, "User out of ingestion quota."])
end

def test_check_quota
def test_quota_reached
http_client = create_http_client()
assert_equal http_client.check_quota, true
assert_equal http_client.quota_reached, false

stub_server_out_of_quota
http_client.post(JSON.dump(sample_record()))
assert_equal http_client.check_quota, false
http_client.post(Yajl.dump(sample_record()))
assert_equal http_client.quota_reached, true
end

def stub_server_unavailable(url=DEFAULT_URL)
Expand Down Expand Up @@ -72,24 +72,24 @@ def test_retry_on_response_status_code
http_client = create_http_client()
stub_server_returns_500
assert_raise RuntimeError do
http_client.post(JSON.dump(sample_record()))
http_client.post(Yajl.dump(sample_record()))
end
end

def test_server_raise_error
http_client = create_http_client()
stub_server_raise_error
assert_raise IOError do
http_client.post(JSON.dump(sample_record()))
http_client.post(Yajl.dump(sample_record()))
end
end

def test_post_logs
stub_post_logs
http_client = create_http_client()
http_client.post(JSON.dump(sample_record()))
http_client.post(Yajl.dump(sample_record()))

stub_post_logs
http_client.post(JSON.dump(sample_record()))
http_client.post(Yajl.dump(sample_record()))
end
end

0 comments on commit b922a31

Please sign in to comment.