Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

Commit

Permalink
Update to Apache NiFi 1.10 (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar authored Jan 29, 2020
1 parent d9902fe commit 8563061
Show file tree
Hide file tree
Showing 30 changed files with 226 additions and 191 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

* [23](https://github.com/influxdata/nifi-influxdb-bundle/issues/23): Added support for InfluxDB v2.0

### Others
* [11](https://github.com/influxdata/nifi-influxdb-bundle/pulls/27): Updated to Apache NiFi 1.10.0

## v1.1 [2019-06-06]

### Features
Expand Down
6 changes: 6 additions & 0 deletions nifi-influx-database-processors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@
<version>2.6.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.11.1</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import com.influxdb.client.domain.Query;
import com.influxdb.exceptions.InfluxException;
import com.influxdb.query.FluxRecord;

import edu.umd.cs.findbugs.annotations.NonNull;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
Expand Down Expand Up @@ -340,7 +339,7 @@ private void onResponseRecord(final Cancellable cancellable, final FluxRecord fl
final RecordSchema recordSchema = record.getSchema();
try {
out = session.write(flowFile);
writer = writerFactory.createWriter(getLogger(), recordSchema, out);
writer = writerFactory.createWriter(getLogger(), recordSchema, out, flowFile);
writer.beginRecordSet();
} catch (Exception e) {
cancellable.cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,12 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import com.google.common.collect.Lists;
import com.influxdb.Cancellable;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.QueryApi;
import com.influxdb.client.domain.Query;
import com.influxdb.query.FluxRecord;
import org.influxdata.nifi.services.InfluxDatabaseService_2;
import org.influxdata.nifi.services.StandardInfluxDatabaseService_2;

import com.google.common.collect.Lists;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.ArrayListRecordWriter;
import org.apache.nifi.serialization.record.RecordSchema;
Expand All @@ -40,6 +37,8 @@
import org.apache.nifi.util.MockProcessorInitializationContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.influxdata.nifi.services.InfluxDatabaseService_2;
import org.influxdata.nifi.services.StandardInfluxDatabaseService_2;
import org.junit.Before;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
Expand Down Expand Up @@ -70,13 +69,13 @@ public void before() throws IOException, GeneralSecurityException, Initializatio
Mockito.doAnswer(invocation -> {
if (queryOnErrorValue != null) {
//noinspection unchecked
Consumer<Exception> onError = invocation.getArgumentAt(3, Consumer.class);
Consumer<Exception> onError = invocation.getArgument(3, Consumer.class);
onError.accept(queryOnErrorValue);
}

queryOnResponseRecords.forEach(record -> {
//noinspection unchecked
BiConsumer<Cancellable, FluxRecord> onRecord = invocation.getArgumentAt(2, BiConsumer.class);
BiConsumer<Cancellable, FluxRecord> onRecord = invocation.getArgument(2, BiConsumer.class);
onRecord.accept(Mockito.mock(Cancellable.class), record);
});

Expand All @@ -88,7 +87,7 @@ public void before() throws IOException, GeneralSecurityException, Initializatio
throw e;
} finally {
if (!wasException) {
Runnable onComplete = invocation.getArgumentAt(4, Runnable.class);
Runnable onComplete = invocation.getArgument(4, Runnable.class);
onComplete.run();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,19 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import com.google.common.collect.Lists;
import com.influxdb.Cancellable;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.QueryApi;
import com.influxdb.client.domain.Query;
import org.influxdata.nifi.services.InfluxDatabaseService_2;
import org.influxdata.nifi.services.StandardInfluxDatabaseService_2;

import com.google.common.collect.Lists;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.MockProcessorInitializationContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.influxdata.nifi.services.InfluxDatabaseService_2;
import org.influxdata.nifi.services.StandardInfluxDatabaseService_2;
import org.junit.Before;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
Expand Down Expand Up @@ -66,13 +65,13 @@ public void before() throws IOException, GeneralSecurityException, Initializatio
Mockito.doAnswer(invocation -> {
if (queryOnErrorValue != null) {
//noinspection unchecked
Consumer<Exception> onError = invocation.getArgumentAt(3, Consumer.class);
Consumer<Exception> onError = invocation.getArgument(3, Consumer.class);
onError.accept(queryOnErrorValue);
}

queryOnResponseRecords.forEach(record -> {
//noinspection unchecked
BiConsumer<Cancellable, String> onRecord = invocation.getArgumentAt(2, BiConsumer.class);
BiConsumer<Cancellable, String> onRecord = invocation.getArgument(2, BiConsumer.class);
onRecord.accept(Mockito.mock(Cancellable.class), record);
});

Expand All @@ -84,7 +83,7 @@ public void before() throws IOException, GeneralSecurityException, Initializatio
throw e;
} finally {
if (!wasException) {
Runnable onComplete = invocation.getArgumentAt(4, Runnable.class);
Runnable onComplete = invocation.getArgument(4, Runnable.class);
onComplete.run();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,13 @@

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import com.influxdb.client.domain.WritePrecision;
import org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor;
import org.influxdata.nifi.services.InfluxDatabaseService_2;
import org.influxdata.nifi.services.StandardInfluxDatabaseService_2;

import com.google.common.collect.Lists;
import groovy.json.JsonSlurper;
import com.influxdb.client.domain.WritePrecision;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
Expand All @@ -46,6 +39,12 @@
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.xml.XMLRecordSetWriter;
import org.assertj.core.api.Assertions;
import org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor;
import org.influxdata.nifi.services.InfluxDatabaseService_2;
import org.influxdata.nifi.services.StandardInfluxDatabaseService_2;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -198,18 +197,18 @@ public void writeToJson() throws InitializationException {
MockFlowFile flowFile = runner.getFlowFilesForRelationship(GetInfluxDatabaseRecord_2.REL_SUCCESS).get(0);

String json = new String(runner.getContentAsByteArray(flowFile));
List<Map<String, Object>> parsedJson = (List<Map<String, Object>>) new JsonSlurper().parseText(json);

Assert.assertEquals(1, parsedJson.size());
Assert.assertEquals("_result", parsedJson.get(0).get("result"));
Assert.assertEquals(0, parsedJson.get(0).get("table"));
Assert.assertEquals(0, parsedJson.get(0).get("_start"));
Assert.assertEquals(0, parsedJson.get(0).get("_time"));
Assert.assertTrue((long) parsedJson.get(0).get("_stop") > from.getTime());
Assert.assertEquals("humidity", parsedJson.get(0).get("_field"));
Assert.assertEquals("newark", parsedJson.get(0).get("city"));
Assert.assertEquals("US", parsedJson.get(0).get("country"));
Assert.assertEquals(BigDecimal.valueOf(0.6D), parsedJson.get(0).get("_value"));
JSONObject parsedJson = (new JSONArray(json)).getJSONObject(0);

Assert.assertNotNull(parsedJson);
Assert.assertEquals("_result", parsedJson.get("result"));
Assert.assertEquals(0, parsedJson.get("table"));
Assert.assertEquals(0, parsedJson.get("_start"));
Assert.assertEquals(0, parsedJson.get("_time"));
Assert.assertTrue((long) parsedJson.get("_stop") > from.getTime());
Assert.assertEquals("humidity", parsedJson.get("_field"));
Assert.assertEquals("newark", parsedJson.get("city"));
Assert.assertEquals("US", parsedJson.get("country"));
Assert.assertEquals(0.6D, parsedJson.get("_value"));
}

@Test
Expand Down Expand Up @@ -288,7 +287,7 @@ public void invalidFlux() {
runner.assertTransferCount(GetInfluxDatabaseRecord_2.REL_FAILURE, 1);

MockFlowFile flowFile = runner.getFlowFilesForRelationship(GetInfluxDatabaseRecord_2.REL_FAILURE).get(0);
Assert.assertEquals("type error 1:45-1:51: undefined identifier \"rangex\"", flowFile.getAttribute(AbstractInfluxDatabaseProcessor.INFLUX_DB_ERROR_MESSAGE));
Assertions.assertThat(flowFile.getAttribute(AbstractInfluxDatabaseProcessor.INFLUX_DB_ERROR_MESSAGE)).contains("undefined identifier \"rangex\"");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
import java.util.Arrays;

import com.influxdb.client.domain.WritePrecision;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunners;
import org.assertj.core.api.Assertions;
import org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor;
import org.influxdata.nifi.services.InfluxDatabaseService_2;
import org.influxdata.nifi.services.StandardInfluxDatabaseService_2;

import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -118,7 +118,7 @@ public void invalidFlux() {
runner.assertTransferCount(GetInfluxDatabase_2.REL_FAILURE, 1);

MockFlowFile flowFile = runner.getFlowFilesForRelationship(GetInfluxDatabaseRecord_2.REL_FAILURE).get(0);
Assert.assertEquals("type error 1:45-1:51: undefined identifier \"rangex\"", flowFile.getAttribute(AbstractInfluxDatabaseProcessor.INFLUX_DB_ERROR_MESSAGE));
Assertions.assertThat(flowFile.getAttribute(AbstractInfluxDatabaseProcessor.INFLUX_DB_ERROR_MESSAGE)).contains("undefined identifier \"rangex\"");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@

import com.influxdb.client.domain.WritePrecision;
import com.influxdb.query.FluxTable;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunners;
import org.assertj.core.api.Assertions;
import org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor;
import org.influxdata.nifi.services.InfluxDatabaseService_2;
import org.influxdata.nifi.services.StandardInfluxDatabaseService_2;

import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -90,8 +90,8 @@ public void testBadFormat() {
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(AbstractInfluxDatabaseProcessor.REL_FAILURE);

assertEquals(1, flowFiles.size());
assertEquals("unable to parse 'water,country=US,city=newark': missing fields",
flowFiles.get(0).getAttribute(AbstractInfluxDatabaseProcessor.INFLUX_DB_ERROR_MESSAGE));
Assertions.assertThat(flowFiles.get(0).getAttribute(AbstractInfluxDatabaseProcessor.INFLUX_DB_ERROR_MESSAGE))
.isEqualTo("unable to parse 'water,country=US,city=newark': missing fields");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@

import com.influxdb.exceptions.InfluxException;
import com.influxdb.exceptions.NotFoundException;
import org.influxdata.nifi.processors.internal.AbstractGetInfluxDatabase_2;

import edu.umd.cs.findbugs.annotations.NonNull;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.util.LogMessage;
import org.apache.nifi.util.MockFlowFile;
import org.influxdata.nifi.processors.internal.AbstractGetInfluxDatabase_2;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -127,10 +126,9 @@ private void verifyError(@NonNull final Exception exception, @NonNull final Stri
List<LogMessage> errors = logger.getErrorMessages();

// First is formatted message, Second Stack Trace
Assert.assertEquals(2, errors.size());
Assert.assertEquals(1, errors.size());

Assert.assertTrue(errors.get(0).getArgs()[2].toString().contains("from(bucket:\"my-bucket\") |> range(start: 0) |> last()"));
Assert.assertEquals(errors.get(1).getThrowable().getLocalizedMessage(), message);
Assert.assertTrue(exception.getClass().isAssignableFrom(errors.get(1).getThrowable().getClass()));
Assert.assertEquals(errors.get(0).getArgs()[3], exception.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@

import com.influxdb.exceptions.InfluxException;
import com.influxdb.exceptions.NotFoundException;
import org.influxdata.nifi.processors.internal.AbstractGetInfluxDatabase_2;

import edu.umd.cs.findbugs.annotations.NonNull;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.util.LogMessage;
import org.apache.nifi.util.MockFlowFile;
import org.influxdata.nifi.processors.internal.AbstractGetInfluxDatabase_2;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -127,10 +126,9 @@ private void verifyError(@NonNull final Exception exception, @NonNull final Stri
List<LogMessage> errors = logger.getErrorMessages();

// First is formatted message, Second Stack Trace
Assert.assertEquals(2, errors.size());
Assert.assertEquals(1, errors.size());

Assert.assertTrue(errors.get(0).getArgs()[2].toString().contains("from(bucket:\"my-bucket\") |> range(start: 0) |> last()"));
Assert.assertEquals(errors.get(1).getThrowable().getLocalizedMessage(), message);
Assert.assertTrue(exception.getClass().isAssignableFrom(errors.get(1).getThrowable().getClass()));
Assert.assertEquals(errors.get(0).getArgs()[3], exception.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.influxdata.nifi.util.InfluxDBUtils;

import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import org.apache.commons.lang3.reflect.FieldUtils;
Expand All @@ -43,6 +41,7 @@
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.LogMessage;
import org.apache.nifi.util.MockFlowFile;
import org.influxdata.nifi.util.InfluxDBUtils;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.junit.Assert;
Expand Down Expand Up @@ -256,7 +255,7 @@ public void addMapFields() {
Assert.assertEquals(1, points.size());

Assert.assertEquals(55.5F, getField("nifi-float", points.get(0)));
Assert.assertEquals(150F, getField("nifi-long", points.get(0)));
Assert.assertEquals(150L, getField("nifi-long", points.get(0)));
Assert.assertEquals(true, getField("nifi-boolean", points.get(0)));
Assert.assertEquals("string value", getField("nifi-string", points.get(0)));
}
Expand Down Expand Up @@ -315,7 +314,7 @@ public void addArrayFieldFail() {
addFieldByType(new Object[]{25L, 55L}, null, dataType, false, PutInfluxDatabaseRecord.REL_FAILURE);

List<LogMessage> messages = logger.getErrorMessages();
Assert.assertEquals(3, messages.size());
Assert.assertEquals(2, messages.size());
Assert.assertTrue(messages.get(0).getMsg().contains("Complex value found for nifi-field; routing to failure"));
}

Expand Down Expand Up @@ -355,7 +354,7 @@ public void addRecordFieldFail() {
addFieldRecordType(false, PutInfluxDatabaseRecord.REL_FAILURE);

List<LogMessage> messages = logger.getErrorMessages();
Assert.assertEquals(3, messages.size());
Assert.assertEquals(2, messages.size());
Assert.assertTrue(messages.get(0).getMsg().contains("Complex value found for nifi-field; routing to failure"));
}

Expand Down Expand Up @@ -662,7 +661,7 @@ public void addArrayTagFail() {
addTagByType(new Object[]{25L, 55L}, null, dataType, false, PutInfluxDatabaseRecord.REL_FAILURE);

List<LogMessage> messages = logger.getErrorMessages();
Assert.assertEquals(3, messages.size());
Assert.assertEquals(2, messages.size());
Assert.assertTrue(messages.get(0).getMsg().contains("Complex value found for nifi-tag; routing to failure"));
}

Expand Down Expand Up @@ -702,7 +701,7 @@ public void addRecordTagFail() {
addTagRecordType(false, PutInfluxDatabaseRecord.REL_FAILURE);

List<LogMessage> messages = logger.getErrorMessages();
Assert.assertEquals(3, messages.size());
Assert.assertEquals(2, messages.size());
Assert.assertTrue(messages.get(0).getMsg().contains("Complex value found for nifi-tag; routing to failure"));
}

Expand Down
Loading

0 comments on commit 8563061

Please sign in to comment.