Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport #1199 Properly handle 413 Payload Too Large errors (#1199) #1206

Merged
merged 1 commit into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
## 11.22.12
- Properly handle http code 413 (Payload Too Large) [#1199](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1199)
## 11.22.11
- Remove irrelevant log warning about elastic stack version [#1202](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1202)

Expand Down
17 changes: 16 additions & 1 deletion docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,22 @@ This plugin uses the Elasticsearch bulk API to optimize its imports into Elastic
either partial or total failures. The bulk API sends batches of requests to an HTTP endpoint. Error codes for the HTTP
request are handled differently than error codes for individual documents.

HTTP requests to the bulk API are expected to return a 200 response code. All other response codes are retried indefinitely.

HTTP requests to the bulk API are expected to return a 200 response code. All other response codes are retried indefinitely,
including 413 (Payload Too Large) responses.

If you want to handle large payloads differently, you can configure 413 responses to go to the Dead Letter Queue instead:

[source,ruby]
-----
output {
elasticsearch {
hosts => ["localhost:9200"]
dlq_custom_codes => [413] # Send 413 errors to DLQ instead of retrying
}
-----

This will capture oversized payloads in the DLQ for analysis rather than retrying them.

The following document errors are handled as follows:

Expand Down
49 changes: 25 additions & 24 deletions lib/logstash/outputs/elasticsearch/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -182,22 +182,20 @@ def join_bulk_responses(bulk_responses)
def bulk_send(body_stream, batch_actions)
params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {}

response = @pool.post(@bulk_path, params, body_stream.string)

@bulk_response_metrics.increment(response.code.to_s)

case response.code
when 200 # OK
LogStash::Json.load(response.body)
when 413 # Payload Too Large
begin
response = @pool.post(@bulk_path, params, body_stream.string)
@bulk_response_metrics.increment(response.code.to_s)
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
@bulk_response_metrics.increment(e.response_code.to_s)
raise e unless e.response_code == 413
# special handling for 413, treat it as a document level issue
logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size)
emulate_batch_error_response(batch_actions, response.code, 'payload_too_large')
else
url = ::LogStash::Util::SafeURI.new(response.final_url)
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
response.code, url, body_stream.to_s, response.body
)
return emulate_batch_error_response(batch_actions, 413, 'payload_too_large')
rescue => e # it may be a network issue instead, re-raise
raise e
end

LogStash::Json.load(response.body)
end

def emulate_batch_error_response(actions, http_code, reason)
Expand Down Expand Up @@ -411,6 +409,9 @@ def host_to_url(h)
def exists?(path, use_get=false)
response = use_get ? @pool.get(path) : @pool.head(path)
response.code >= 200 && response.code <= 299
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
return false if e.response_code == 404
raise e
end

def template_exists?(template_endpoint, name)
Expand All @@ -421,6 +422,8 @@ def template_put(template_endpoint, name, template)
path = "#{template_endpoint}/#{name}"
logger.info("Installing Elasticsearch template", name: name)
@pool.put(path, nil, LogStash::Json.dump(template))
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
raise e unless e.response_code == 404
end

# ILM methods
Expand All @@ -432,17 +435,15 @@ def rollover_alias_exists?(name)

# Create a new rollover alias
def rollover_alias_put(alias_name, alias_definition)
begin
@pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition))
logger.info("Created rollover alias", name: alias_name)
# If the rollover alias already exists, ignore the error that comes back from Elasticsearch
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
if e.response_code == 400
logger.info("Rollover alias already exists, skipping", name: alias_name)
return
end
raise e
@pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition))
logger.info("Created rollover alias", name: alias_name)
# If the rollover alias already exists, ignore the error that comes back from Elasticsearch
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
if e.response_code == 400
logger.info("Rollover alias already exists, skipping", name: alias_name)
return
end
raise e
end

def get_xpack_info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,8 @@ def perform_request(url, method, path, params={}, body=nil)
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError.new(e, request_uri_as_string)
end

# 404s are excluded because they are valid codes in the case of
# template installation. We might need a better story around this later
# but for our current purposes this is correct
code = resp.code
if code < 200 || code > 299 && code != 404
if code < 200 || code > 299 # assume anything not 2xx is an error that the layer above needs to interpret
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(code, request_uri, body, resp.body)
end

Expand Down
24 changes: 10 additions & 14 deletions lib/logstash/outputs/elasticsearch/http_client/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,11 @@ def get_license(url)
def health_check_request(url)
logger.debug("Running health check to see if an Elasticsearch connection is working",
:healthcheck_url => url.sanitized.to_s, :path => @healthcheck_path)
begin
response = perform_request_to_url(url, :head, @healthcheck_path)
return response, nil
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message)
return nil, e
end
response = perform_request_to_url(url, :head, @healthcheck_path)
return response, nil
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message)
return nil, e
end

def healthcheck!(register_phase = true)
Expand Down Expand Up @@ -312,13 +310,11 @@ def healthcheck!(register_phase = true)
end

def get_root_path(url, params={})
begin
resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params)
return resp, nil
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body)
return nil, e
end
resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params)
return resp, nil
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body)
return nil, e
end

def test_serverless_connection(url, root_response)
Expand Down
2 changes: 1 addition & 1 deletion logstash-output-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-elasticsearch'
s.version = '11.22.11'
s.version = '11.22.12'
s.licenses = ['apache-2.0']
s.summary = "Stores logs in Elasticsearch"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
7 changes: 6 additions & 1 deletion spec/unit/outputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,12 @@
allow(elasticsearch_output_instance.client.pool).to receive(:post) do |path, params, body|
if body.length > max_bytes
max_bytes *= 2 # ensure a successful retry
double("Response", :code => 413, :body => "")
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
413,
"test-url",
body,
""
)
else
double("Response", :code => 200, :body => '{"errors":false,"items":[{"index":{"status":200,"result":"created"}}]}')
end
Expand Down
Loading