From 85fce89ac12300ed7ebf3048420a4f5466c25ed5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Bedn=C3=A1=C5=99?= Date: Tue, 29 Mar 2022 14:04:53 +0200 Subject: [PATCH] feat: add `Retry-After` header as FlowFile attribute (#67) --- CHANGELOG.md | 3 +++ .../processors/PutInfluxDatabaseRecord_2.java | 5 ++++ .../nifi/processors/PutInfluxDatabase_2.java | 6 +++++ .../AbstractInfluxDatabaseProcessor.java | 1 + .../AbstractInfluxDatabaseProcessor_2.java | 23 +++++++++++++++++++ ...utInfluxDatabaseRecordErrorHandling_2.java | 18 +++++++++++++++ .../processors/TestPutInfluxDatabase_2.java | 22 ++++++++++++++++++ .../org/influxdata/nifi/processors/Utils.java | 23 +++++++++++++------ 8 files changed, 94 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9dd4826..8258f5f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## v1.16.0 [unreleased] +### Features +1. [#67](https://github.com/influxdata/nifi-influxdb-bundle/pull/67): Add `Retry-After` header value when a FlowFile is transferred to `Retry` output + ### Others 1. [#65](https://github.com/influxdata/nifi-influxdb-bundle/pull/65): Update to Apache NiFi 1.15.3 diff --git a/nifi-influx-database-processors/src/main/java/org/influxdata/nifi/processors/PutInfluxDatabaseRecord_2.java b/nifi-influx-database-processors/src/main/java/org/influxdata/nifi/processors/PutInfluxDatabaseRecord_2.java index 1715750..8bd7920 100644 --- a/nifi-influx-database-processors/src/main/java/org/influxdata/nifi/processors/PutInfluxDatabaseRecord_2.java +++ b/nifi-influx-database-processors/src/main/java/org/influxdata/nifi/processors/PutInfluxDatabaseRecord_2.java @@ -52,6 +52,7 @@ import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.INFLUX_DB_ERROR_MESSAGE; import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.INFLUX_DB_FAIL_TO_INSERT; +import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.INFLUX_DB_RETRY_AFTER; import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.MAX_RECORDS_SIZE; import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.RECORD_READER_FACTORY; import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.REL_FAILURE; @@ -180,6 +181,10 @@ public void onTrigger(final ProcessContext context, final ProcessSession session if (Arrays.asList(429, 503).contains(ie.status()) || ie.getCause() instanceof SocketTimeoutException) { getLogger().error("Failed to insert into influxDB due {} to {} and retrying", new Object[]{ie.status(), ie.getLocalizedMessage()}, ie); + String retryAfterHeader = getRetryAfterHeader(ie); + if (StringUtils.isNoneBlank(retryAfterHeader)) { + flowFile = session.putAttribute(flowFile, INFLUX_DB_RETRY_AFTER, retryAfterHeader); + } session.penalize(flowFile); session.transfer(flowFile, REL_RETRY); } else { diff --git a/nifi-influx-database-processors/src/main/java/org/influxdata/nifi/processors/PutInfluxDatabase_2.java b/nifi-influx-database-processors/src/main/java/org/influxdata/nifi/processors/PutInfluxDatabase_2.java index 3273d2f..9d9658c 100644 --- a/nifi-influx-database-processors/src/main/java/org/influxdata/nifi/processors/PutInfluxDatabase_2.java +++ b/nifi-influx-database-processors/src/main/java/org/influxdata/nifi/processors/PutInfluxDatabase_2.java @@ -31,6 +31,7 @@ import org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor_2; import org.influxdata.nifi.util.PropertyValueUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SupportsBatching; @@ -54,6 +55,7 @@ import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.CHARSET; import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.INFLUX_DB_ERROR_MESSAGE; import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.INFLUX_DB_FAIL_TO_INSERT; +import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.INFLUX_DB_RETRY_AFTER; import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.MAX_RECORDS_SIZE; /** @@ -168,6 +170,10 @@ public void onTrigger(final ProcessContext context, final ProcessSession session if (Arrays.asList(429, 503).contains(ie.status()) || ie.getCause() instanceof SocketTimeoutException) { getLogger().error("Failed to insert into influxDB due {} to {} and retrying", new Object[]{ie.status(), ie.getLocalizedMessage()}, ie); + String retryAfterHeader = getRetryAfterHeader(ie); + if (StringUtils.isNoneBlank(retryAfterHeader)) { + flowFile = session.putAttribute(flowFile, INFLUX_DB_RETRY_AFTER, retryAfterHeader); + } session.transfer(flowFile, REL_RETRY); } else { getLogger().error(INFLUX_DB_FAIL_TO_INSERT, new Object[]{ie.getLocalizedMessage()}, ie); diff --git a/nifi-influx-database-processors/src/main/java/org/influxdata/nifi/processors/internal/AbstractInfluxDatabaseProcessor.java b/nifi-influx-database-processors/src/main/java/org/influxdata/nifi/processors/internal/AbstractInfluxDatabaseProcessor.java index 8cfab3f..5b38463 100644 --- a/nifi-influx-database-processors/src/main/java/org/influxdata/nifi/processors/internal/AbstractInfluxDatabaseProcessor.java +++ b/nifi-influx-database-processors/src/main/java/org/influxdata/nifi/processors/internal/AbstractInfluxDatabaseProcessor.java @@ -129,6 +129,7 @@ public abstract class AbstractInfluxDatabaseProcessor extends AbstractProcessor public static final String INFLUX_DB_ERROR_MESSAGE = "influxdb.error.message"; + public static final String INFLUX_DB_RETRY_AFTER = "influxdb.retry-after"; public static final String INFLUX_DB_ERROR_MESSAGE_LOG = "Failed procession flow file {} due to {}"; public static final String INFLUX_DB_FAIL_TO_INSERT = "Failed to insert into influxDB due to {}"; public static final String INFLUX_DB_FAIL_TO_QUERY = "Failed to execute Flux query due {} to {}"; diff --git a/nifi-influx-database-processors/src/main/java/org/influxdata/nifi/processors/internal/AbstractInfluxDatabaseProcessor_2.java b/nifi-influx-database-processors/src/main/java/org/influxdata/nifi/processors/internal/AbstractInfluxDatabaseProcessor_2.java index 967ce1d..288f2dc 100644 --- a/nifi-influx-database-processors/src/main/java/org/influxdata/nifi/processors/internal/AbstractInfluxDatabaseProcessor_2.java +++ b/nifi-influx-database-processors/src/main/java/org/influxdata/nifi/processors/internal/AbstractInfluxDatabaseProcessor_2.java @@ -16,13 +16,16 @@ */ package org.influxdata.nifi.processors.internal; +import java.lang.reflect.Field; import java.util.Arrays; import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; import com.influxdb.LogLevel; import com.influxdb.client.InfluxDBClient; import com.influxdb.client.domain.WritePrecision; +import com.influxdb.exceptions.InfluxException; import org.influxdata.nifi.services.InfluxDatabaseService_2; import edu.umd.cs.findbugs.annotations.NonNull; @@ -36,6 +39,7 @@ import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; +import retrofit2.Response; import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.MAX_RECORDS_SIZE; import static org.influxdata.nifi.util.PropertyValueUtils.getEnumValue; @@ -196,6 +200,25 @@ public synchronized InfluxDBClient getInfluxDBClient(final ProcessContext contex return influxDBClient.get(); } + @Nullable + protected String getRetryAfterHeader(InfluxException ie) { + try { + // + // Temporally solution before release https://github.com/influxdata/influxdb-client-java/pull/317 + // + Field responseField = InfluxException.class.getDeclaredField("response"); + responseField.setAccessible(true); + Response response = (Response) responseField.get(ie); + if (response != null) { + return response.headers().get("Retry-After"); + } + } catch (Exception e) { + return null; + } + + return null; + } + /** * Configure LogLevel and GZIP. */ diff --git a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestPutInfluxDatabaseRecordErrorHandling_2.java b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestPutInfluxDatabaseRecordErrorHandling_2.java index afa009d..11c70e8 100644 --- a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestPutInfluxDatabaseRecordErrorHandling_2.java +++ b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestPutInfluxDatabaseRecordErrorHandling_2.java @@ -17,6 +17,7 @@ package org.influxdata.nifi.processors; import java.net.SocketTimeoutException; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -34,12 +35,16 @@ import org.influxdata.nifi.util.InfluxDBUtils; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import static org.influxdata.nifi.processors.Utils.createErrorResponse; import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.INFLUX_DB_ERROR_MESSAGE; +import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.INFLUX_DB_RETRY_AFTER; +import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.REL_RETRY; import static org.influxdata.nifi.util.InfluxDBUtils.AT_LEAST_ONE_FIELD_DEFINED_MESSAGE; import static org.influxdata.nifi.util.InfluxDBUtils.MissingItemsBehaviour.FAIL; import static org.influxdata.nifi.util.InfluxDBUtils.MissingItemsBehaviour.IGNORE; +import static org.junit.Assert.assertEquals; /** * @author Jakub Bednar (bednar@github) (18/07/2019 07:24) @@ -127,6 +132,19 @@ public void tooManyRequestsException() { new InfluxException(createErrorResponse(429)), "Simulate error: 429"); } + @Test + public void tooManyRequestsExceptionRetryAfterHeader() { + + Map headers = Collections.singletonMap("Retry-After", "145"); + + errorToRetryRelationship( + new InfluxException(createErrorResponse(429, headers)), "Simulate error: 429"); + + List flowFiles = runner.getFlowFilesForRelationship(REL_RETRY); + + assertEquals("145", flowFiles.get(0).getAttribute(INFLUX_DB_RETRY_AFTER)); + } + @Test public void notFoundException() { diff --git a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestPutInfluxDatabase_2.java b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestPutInfluxDatabase_2.java index 123cbaf..96ed898 100644 --- a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestPutInfluxDatabase_2.java +++ b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestPutInfluxDatabase_2.java @@ -19,7 +19,9 @@ import java.io.IOException; import java.net.SocketTimeoutException; import java.security.GeneralSecurityException; +import java.util.Collections; import java.util.List; +import java.util.Map; import com.influxdb.client.InfluxDBClient; import com.influxdb.client.WriteApiBlocking; @@ -41,6 +43,7 @@ import static org.influxdata.nifi.processors.Utils.createErrorResponse; import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.CHARSET; import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.INFLUX_DB_ERROR_MESSAGE; +import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.INFLUX_DB_RETRY_AFTER; import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.MAX_RECORDS_SIZE; import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.REL_FAILURE; import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.REL_MAX_SIZE_EXCEEDED; @@ -253,6 +256,25 @@ public void testWriteTooManyRequestsExceptionRetry() { assertEquals("Simulate error: 429", flowFiles.get(0).getAttribute(INFLUX_DB_ERROR_MESSAGE)); } + @Test + public void testWriteTooManyRequestsExceptionRetryAfterHeader() { + + byte[] bytes = "h2o_feet,location=coyote_creek level\\ water_level=1.0 1".getBytes(); + + Map headers = Collections.singletonMap("Retry-After", "149"); + + Mockito.doThrow(new InfluxException(createErrorResponse(429, headers))).when(mockWriteApi) + .writeRecord(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + + runner.enqueue(bytes); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(REL_RETRY, 1); + + List flowFiles = runner.getFlowFilesForRelationship(REL_RETRY); + + assertEquals("149", flowFiles.get(0).getAttribute(INFLUX_DB_RETRY_AFTER)); + } + @Test public void testCannotInstantiateInfluxDBClient() throws InitializationException, IOException, GeneralSecurityException { diff --git a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/Utils.java b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/Utils.java index c1f5709..fff8091 100644 --- a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/Utils.java +++ b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/Utils.java @@ -16,6 +16,10 @@ */ package org.influxdata.nifi.processors; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nonnull; + import okhttp3.MediaType; import okhttp3.Protocol; import okhttp3.Request; @@ -24,16 +28,21 @@ class Utils { - static Response createErrorResponse(final int code) { - okhttp3.Response build = new okhttp3.Response.Builder() // - .code(code).addHeader("X-Influx-Error", "Simulate error: " + code) + static Response createErrorResponse(final int code) { + return createErrorResponse(code, new HashMap<>()); + } + + static Response createErrorResponse(final int code, @Nonnull final Map headers) { + okhttp3.Response.Builder builder = new okhttp3.Response.Builder() // + .code(code) + .addHeader("X-Influx-Error", "Simulate error: " + code) .message("Response.error()") .protocol(Protocol.HTTP_1_1) - .request(new Request.Builder().url("http://localhost/").build()) - .build(); + .request(new Request.Builder().url("http://localhost/").build()); + + headers.forEach(builder::addHeader); - return Response - .error(ResponseBody.create(MediaType.parse("application/json"), ""), build); + return Response.error(ResponseBody.create("", MediaType.parse("application/json")), builder.build()); } }