Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

Commit

Permalink
feat: add Retry-After header as FlowFile attribute (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar authored Mar 29, 2022
1 parent 6cd9b34 commit 85fce89
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 7 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## v1.16.0 [unreleased]

### Features
1. [#67](https://github.com/influxdata/nifi-influxdb-bundle/pull/67): Add `Retry-After` header value when a FlowFile is transferred to `Retry` output

### Others
1. [#65](https://github.com/influxdata/nifi-influxdb-bundle/pull/65): Update to Apache NiFi 1.15.3

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.INFLUX_DB_ERROR_MESSAGE;
import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.INFLUX_DB_FAIL_TO_INSERT;
import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.INFLUX_DB_RETRY_AFTER;
import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.MAX_RECORDS_SIZE;
import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.RECORD_READER_FACTORY;
import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.REL_FAILURE;
Expand Down Expand Up @@ -180,6 +181,10 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
if (Arrays.asList(429, 503).contains(ie.status()) || ie.getCause() instanceof SocketTimeoutException) {
getLogger().error("Failed to insert into influxDB due {} to {} and retrying",
new Object[]{ie.status(), ie.getLocalizedMessage()}, ie);
String retryAfterHeader = getRetryAfterHeader(ie);
if (StringUtils.isNoneBlank(retryAfterHeader)) {
flowFile = session.putAttribute(flowFile, INFLUX_DB_RETRY_AFTER, retryAfterHeader);
}
session.penalize(flowFile);
session.transfer(flowFile, REL_RETRY);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor_2;
import org.influxdata.nifi.util.PropertyValueUtils;

import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
Expand All @@ -54,6 +55,7 @@
import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.CHARSET;
import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.INFLUX_DB_ERROR_MESSAGE;
import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.INFLUX_DB_FAIL_TO_INSERT;
import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.INFLUX_DB_RETRY_AFTER;
import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.MAX_RECORDS_SIZE;

/**
Expand Down Expand Up @@ -168,6 +170,10 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
if (Arrays.asList(429, 503).contains(ie.status()) || ie.getCause() instanceof SocketTimeoutException) {
getLogger().error("Failed to insert into influxDB due {} to {} and retrying",
new Object[]{ie.status(), ie.getLocalizedMessage()}, ie);
String retryAfterHeader = getRetryAfterHeader(ie);
if (StringUtils.isNoneBlank(retryAfterHeader)) {
flowFile = session.putAttribute(flowFile, INFLUX_DB_RETRY_AFTER, retryAfterHeader);
}
session.transfer(flowFile, REL_RETRY);
} else {
getLogger().error(INFLUX_DB_FAIL_TO_INSERT, new Object[]{ie.getLocalizedMessage()}, ie);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public abstract class AbstractInfluxDatabaseProcessor extends AbstractProcessor


public static final String INFLUX_DB_ERROR_MESSAGE = "influxdb.error.message";
public static final String INFLUX_DB_RETRY_AFTER = "influxdb.retry-after";
public static final String INFLUX_DB_ERROR_MESSAGE_LOG = "Failed procession flow file {} due to {}";
public static final String INFLUX_DB_FAIL_TO_INSERT = "Failed to insert into influxDB due to {}";
public static final String INFLUX_DB_FAIL_TO_QUERY = "Failed to execute Flux query due {} to {}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
*/
package org.influxdata.nifi.processors.internal;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;

import com.influxdb.LogLevel;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.exceptions.InfluxException;
import org.influxdata.nifi.services.InfluxDatabaseService_2;

import edu.umd.cs.findbugs.annotations.NonNull;
Expand All @@ -36,6 +39,7 @@
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import retrofit2.Response;

import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.MAX_RECORDS_SIZE;
import static org.influxdata.nifi.util.PropertyValueUtils.getEnumValue;
Expand Down Expand Up @@ -196,6 +200,25 @@ public synchronized InfluxDBClient getInfluxDBClient(final ProcessContext contex
return influxDBClient.get();
}

@Nullable
protected String getRetryAfterHeader(InfluxException ie) {
try {
//
// Temporally solution before release https://github.com/influxdata/influxdb-client-java/pull/317
//
Field responseField = InfluxException.class.getDeclaredField("response");
responseField.setAccessible(true);
Response response = (Response) responseField.get(ie);
if (response != null) {
return response.headers().get("Retry-After");
}
} catch (Exception e) {
return null;
}

return null;
}

/**
* Configure LogLevel and GZIP.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.influxdata.nifi.processors;

import java.net.SocketTimeoutException;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand All @@ -34,12 +35,16 @@
import org.influxdata.nifi.util.InfluxDBUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import static org.influxdata.nifi.processors.Utils.createErrorResponse;
import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.INFLUX_DB_ERROR_MESSAGE;
import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.INFLUX_DB_RETRY_AFTER;
import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.REL_RETRY;
import static org.influxdata.nifi.util.InfluxDBUtils.AT_LEAST_ONE_FIELD_DEFINED_MESSAGE;
import static org.influxdata.nifi.util.InfluxDBUtils.MissingItemsBehaviour.FAIL;
import static org.influxdata.nifi.util.InfluxDBUtils.MissingItemsBehaviour.IGNORE;
import static org.junit.Assert.assertEquals;

/**
* @author Jakub Bednar (bednar@github) (18/07/2019 07:24)
Expand Down Expand Up @@ -127,6 +132,19 @@ public void tooManyRequestsException() {
new InfluxException(createErrorResponse(429)), "Simulate error: 429");
}

@Test
public void tooManyRequestsExceptionRetryAfterHeader() {

Map<String, String> headers = Collections.singletonMap("Retry-After", "145");

errorToRetryRelationship(
new InfluxException(createErrorResponse(429, headers)), "Simulate error: 429");

List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_RETRY);

assertEquals("145", flowFiles.get(0).getAttribute(INFLUX_DB_RETRY_AFTER));
}

@Test
public void notFoundException() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.security.GeneralSecurityException;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.WriteApiBlocking;
Expand All @@ -41,6 +43,7 @@
import static org.influxdata.nifi.processors.Utils.createErrorResponse;
import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.CHARSET;
import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.INFLUX_DB_ERROR_MESSAGE;
import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.INFLUX_DB_RETRY_AFTER;
import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.MAX_RECORDS_SIZE;
import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.REL_FAILURE;
import static org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor.REL_MAX_SIZE_EXCEEDED;
Expand Down Expand Up @@ -253,6 +256,25 @@ public void testWriteTooManyRequestsExceptionRetry() {
assertEquals("Simulate error: 429", flowFiles.get(0).getAttribute(INFLUX_DB_ERROR_MESSAGE));
}

@Test
public void testWriteTooManyRequestsExceptionRetryAfterHeader() {

byte[] bytes = "h2o_feet,location=coyote_creek level\\ water_level=1.0 1".getBytes();

Map<String, String> headers = Collections.singletonMap("Retry-After", "149");

Mockito.doThrow(new InfluxException(createErrorResponse(429, headers))).when(mockWriteApi)
.writeRecord(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());

runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(REL_RETRY, 1);

List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(REL_RETRY);

assertEquals("149", flowFiles.get(0).getAttribute(INFLUX_DB_RETRY_AFTER));
}

@Test
public void testCannotInstantiateInfluxDBClient() throws InitializationException, IOException, GeneralSecurityException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
package org.influxdata.nifi.processors;

import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nonnull;

import okhttp3.MediaType;
import okhttp3.Protocol;
import okhttp3.Request;
Expand All @@ -24,16 +28,21 @@

class Utils {

static Response createErrorResponse(final int code) {
okhttp3.Response build = new okhttp3.Response.Builder() //
.code(code).addHeader("X-Influx-Error", "Simulate error: " + code)
static Response<Object> createErrorResponse(final int code) {
return createErrorResponse(code, new HashMap<>());
}

static Response<Object> createErrorResponse(final int code, @Nonnull final Map<String, String> headers) {
okhttp3.Response.Builder builder = new okhttp3.Response.Builder() //
.code(code)
.addHeader("X-Influx-Error", "Simulate error: " + code)
.message("Response.error()")
.protocol(Protocol.HTTP_1_1)
.request(new Request.Builder().url("http://localhost/").build())
.build();
.request(new Request.Builder().url("http://localhost/").build());

headers.forEach(builder::addHeader);

return Response
.error(ResponseBody.create(MediaType.parse("application/json"), ""), build);
return Response.error(ResponseBody.create("", MediaType.parse("application/json")), builder.build());
}

}

0 comments on commit 85fce89

Please sign in to comment.