diff --git a/CHANGELOG.md b/CHANGELOG.md index 21a8032..6379686 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ * [23](https://github.com/influxdata/nifi-influxdb-bundle/issues/23): Added support for InfluxDB v2.0 +### Others +* [11](https://github.com/influxdata/nifi-influxdb-bundle/pulls/27): Updated to Apache NiFi 1.10.0 + ## v1.1 [2019-06-06] ### Features diff --git a/nifi-influx-database-processors/pom.xml b/nifi-influx-database-processors/pom.xml index e7706c3..b47601e 100644 --- a/nifi-influx-database-processors/pom.xml +++ b/nifi-influx-database-processors/pom.xml @@ -147,6 +147,12 @@ 2.6.2 test + + + org.assertj + assertj-core + 3.11.1 + diff --git a/nifi-influx-database-processors/src/main/java/org/influxdata/nifi/processors/internal/AbstractGetInfluxDatabase_2.java b/nifi-influx-database-processors/src/main/java/org/influxdata/nifi/processors/internal/AbstractGetInfluxDatabase_2.java index 20f6701..eda8fb4 100644 --- a/nifi-influx-database-processors/src/main/java/org/influxdata/nifi/processors/internal/AbstractGetInfluxDatabase_2.java +++ b/nifi-influx-database-processors/src/main/java/org/influxdata/nifi/processors/internal/AbstractGetInfluxDatabase_2.java @@ -37,7 +37,6 @@ import com.influxdb.client.domain.Query; import com.influxdb.exceptions.InfluxException; import com.influxdb.query.FluxRecord; - import edu.umd.cs.findbugs.annotations.NonNull; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; @@ -340,7 +339,7 @@ private void onResponseRecord(final Cancellable cancellable, final FluxRecord fl final RecordSchema recordSchema = record.getSchema(); try { out = session.write(flowFile); - writer = writerFactory.createWriter(getLogger(), recordSchema, out); + writer = writerFactory.createWriter(getLogger(), recordSchema, out, flowFile); writer.beginRecordSet(); } catch (Exception e) { cancellable.cancel(); diff --git a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/AbstractTestGetInfluxDatabaseRecord_2.java b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/AbstractTestGetInfluxDatabaseRecord_2.java index 49a9020..1a43711 100644 --- a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/AbstractTestGetInfluxDatabaseRecord_2.java +++ b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/AbstractTestGetInfluxDatabaseRecord_2.java @@ -23,15 +23,12 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; +import com.google.common.collect.Lists; import com.influxdb.Cancellable; import com.influxdb.client.InfluxDBClient; import com.influxdb.client.QueryApi; import com.influxdb.client.domain.Query; import com.influxdb.query.FluxRecord; -import org.influxdata.nifi.services.InfluxDatabaseService_2; -import org.influxdata.nifi.services.StandardInfluxDatabaseService_2; - -import com.google.common.collect.Lists; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.record.ArrayListRecordWriter; import org.apache.nifi.serialization.record.RecordSchema; @@ -40,6 +37,8 @@ import org.apache.nifi.util.MockProcessorInitializationContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.influxdata.nifi.services.InfluxDatabaseService_2; +import org.influxdata.nifi.services.StandardInfluxDatabaseService_2; import org.junit.Before; import org.mockito.Mockito; import org.mockito.stubbing.Answer; @@ -70,13 +69,13 @@ public void before() throws IOException, GeneralSecurityException, Initializatio Mockito.doAnswer(invocation -> { if (queryOnErrorValue != null) { //noinspection unchecked - Consumer onError = invocation.getArgumentAt(3, Consumer.class); + Consumer onError = invocation.getArgument(3, Consumer.class); onError.accept(queryOnErrorValue); } queryOnResponseRecords.forEach(record -> { //noinspection unchecked - BiConsumer onRecord = invocation.getArgumentAt(2, BiConsumer.class); + BiConsumer onRecord = invocation.getArgument(2, BiConsumer.class); onRecord.accept(Mockito.mock(Cancellable.class), record); }); @@ -88,7 +87,7 @@ public void before() throws IOException, GeneralSecurityException, Initializatio throw e; } finally { if (!wasException) { - Runnable onComplete = invocation.getArgumentAt(4, Runnable.class); + Runnable onComplete = invocation.getArgument(4, Runnable.class); onComplete.run(); } } diff --git a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/AbstractTestGetInfluxDatabase_2.java b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/AbstractTestGetInfluxDatabase_2.java index c811308..17b9884 100644 --- a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/AbstractTestGetInfluxDatabase_2.java +++ b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/AbstractTestGetInfluxDatabase_2.java @@ -22,20 +22,19 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; +import com.google.common.collect.Lists; import com.influxdb.Cancellable; import com.influxdb.client.InfluxDBClient; import com.influxdb.client.QueryApi; import com.influxdb.client.domain.Query; -import org.influxdata.nifi.services.InfluxDatabaseService_2; -import org.influxdata.nifi.services.StandardInfluxDatabaseService_2; - -import com.google.common.collect.Lists; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockComponentLog; import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.MockProcessorInitializationContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.influxdata.nifi.services.InfluxDatabaseService_2; +import org.influxdata.nifi.services.StandardInfluxDatabaseService_2; import org.junit.Before; import org.mockito.Mockito; import org.mockito.stubbing.Answer; @@ -66,13 +65,13 @@ public void before() throws IOException, GeneralSecurityException, Initializatio Mockito.doAnswer(invocation -> { if (queryOnErrorValue != null) { //noinspection unchecked - Consumer onError = invocation.getArgumentAt(3, Consumer.class); + Consumer onError = invocation.getArgument(3, Consumer.class); onError.accept(queryOnErrorValue); } queryOnResponseRecords.forEach(record -> { //noinspection unchecked - BiConsumer onRecord = invocation.getArgumentAt(2, BiConsumer.class); + BiConsumer onRecord = invocation.getArgument(2, BiConsumer.class); onRecord.accept(Mockito.mock(Cancellable.class), record); }); @@ -84,7 +83,7 @@ public void before() throws IOException, GeneralSecurityException, Initializatio throw e; } finally { if (!wasException) { - Runnable onComplete = invocation.getArgumentAt(4, Runnable.class); + Runnable onComplete = invocation.getArgument(4, Runnable.class); onComplete.run(); } } diff --git a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/ITGetInfluxDatabaseRecord_2.java b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/ITGetInfluxDatabaseRecord_2.java index 486cf7f..8449bb5 100644 --- a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/ITGetInfluxDatabaseRecord_2.java +++ b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/ITGetInfluxDatabaseRecord_2.java @@ -18,20 +18,13 @@ import java.io.ByteArrayInputStream; import java.io.IOException; -import java.math.BigDecimal; import java.sql.Timestamp; import java.time.Instant; import java.util.Arrays; import java.util.List; -import java.util.Map; - -import com.influxdb.client.domain.WritePrecision; -import org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor; -import org.influxdata.nifi.services.InfluxDatabaseService_2; -import org.influxdata.nifi.services.StandardInfluxDatabaseService_2; import com.google.common.collect.Lists; -import groovy.json.JsonSlurper; +import com.influxdb.client.domain.WritePrecision; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; @@ -46,6 +39,12 @@ import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunners; import org.apache.nifi.xml.XMLRecordSetWriter; +import org.assertj.core.api.Assertions; +import org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor; +import org.influxdata.nifi.services.InfluxDatabaseService_2; +import org.influxdata.nifi.services.StandardInfluxDatabaseService_2; +import org.json.JSONArray; +import org.json.JSONObject; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -198,18 +197,18 @@ public void writeToJson() throws InitializationException { MockFlowFile flowFile = runner.getFlowFilesForRelationship(GetInfluxDatabaseRecord_2.REL_SUCCESS).get(0); String json = new String(runner.getContentAsByteArray(flowFile)); - List> parsedJson = (List>) new JsonSlurper().parseText(json); - - Assert.assertEquals(1, parsedJson.size()); - Assert.assertEquals("_result", parsedJson.get(0).get("result")); - Assert.assertEquals(0, parsedJson.get(0).get("table")); - Assert.assertEquals(0, parsedJson.get(0).get("_start")); - Assert.assertEquals(0, parsedJson.get(0).get("_time")); - Assert.assertTrue((long) parsedJson.get(0).get("_stop") > from.getTime()); - Assert.assertEquals("humidity", parsedJson.get(0).get("_field")); - Assert.assertEquals("newark", parsedJson.get(0).get("city")); - Assert.assertEquals("US", parsedJson.get(0).get("country")); - Assert.assertEquals(BigDecimal.valueOf(0.6D), parsedJson.get(0).get("_value")); + JSONObject parsedJson = (new JSONArray(json)).getJSONObject(0); + + Assert.assertNotNull(parsedJson); + Assert.assertEquals("_result", parsedJson.get("result")); + Assert.assertEquals(0, parsedJson.get("table")); + Assert.assertEquals(0, parsedJson.get("_start")); + Assert.assertEquals(0, parsedJson.get("_time")); + Assert.assertTrue((long) parsedJson.get("_stop") > from.getTime()); + Assert.assertEquals("humidity", parsedJson.get("_field")); + Assert.assertEquals("newark", parsedJson.get("city")); + Assert.assertEquals("US", parsedJson.get("country")); + Assert.assertEquals(0.6D, parsedJson.get("_value")); } @Test @@ -288,7 +287,7 @@ public void invalidFlux() { runner.assertTransferCount(GetInfluxDatabaseRecord_2.REL_FAILURE, 1); MockFlowFile flowFile = runner.getFlowFilesForRelationship(GetInfluxDatabaseRecord_2.REL_FAILURE).get(0); - Assert.assertEquals("type error 1:45-1:51: undefined identifier \"rangex\"", flowFile.getAttribute(AbstractInfluxDatabaseProcessor.INFLUX_DB_ERROR_MESSAGE)); + Assertions.assertThat(flowFile.getAttribute(AbstractInfluxDatabaseProcessor.INFLUX_DB_ERROR_MESSAGE)).contains("undefined identifier \"rangex\""); } @Test diff --git a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/ITGetInfluxDatabase_2.java b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/ITGetInfluxDatabase_2.java index 6383bdb..2385610 100644 --- a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/ITGetInfluxDatabase_2.java +++ b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/ITGetInfluxDatabase_2.java @@ -19,12 +19,12 @@ import java.util.Arrays; import com.influxdb.client.domain.WritePrecision; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunners; +import org.assertj.core.api.Assertions; import org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor; import org.influxdata.nifi.services.InfluxDatabaseService_2; import org.influxdata.nifi.services.StandardInfluxDatabaseService_2; - -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunners; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -118,7 +118,7 @@ public void invalidFlux() { runner.assertTransferCount(GetInfluxDatabase_2.REL_FAILURE, 1); MockFlowFile flowFile = runner.getFlowFilesForRelationship(GetInfluxDatabaseRecord_2.REL_FAILURE).get(0); - Assert.assertEquals("type error 1:45-1:51: undefined identifier \"rangex\"", flowFile.getAttribute(AbstractInfluxDatabaseProcessor.INFLUX_DB_ERROR_MESSAGE)); + Assertions.assertThat(flowFile.getAttribute(AbstractInfluxDatabaseProcessor.INFLUX_DB_ERROR_MESSAGE)).contains("undefined identifier \"rangex\""); } @Test diff --git a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/ITPutInfluxDatabase_2.java b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/ITPutInfluxDatabase_2.java index 29dc304..15b0607 100644 --- a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/ITPutInfluxDatabase_2.java +++ b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/ITPutInfluxDatabase_2.java @@ -22,12 +22,12 @@ import com.influxdb.client.domain.WritePrecision; import com.influxdb.query.FluxTable; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunners; +import org.assertj.core.api.Assertions; import org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor; import org.influxdata.nifi.services.InfluxDatabaseService_2; import org.influxdata.nifi.services.StandardInfluxDatabaseService_2; - -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunners; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -90,8 +90,8 @@ public void testBadFormat() { List flowFiles = runner.getFlowFilesForRelationship(AbstractInfluxDatabaseProcessor.REL_FAILURE); assertEquals(1, flowFiles.size()); - assertEquals("unable to parse 'water,country=US,city=newark': missing fields", - flowFiles.get(0).getAttribute(AbstractInfluxDatabaseProcessor.INFLUX_DB_ERROR_MESSAGE)); + Assertions.assertThat(flowFiles.get(0).getAttribute(AbstractInfluxDatabaseProcessor.INFLUX_DB_ERROR_MESSAGE)) + .isEqualTo("unable to parse 'water,country=US,city=newark': missing fields"); } @Test diff --git a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestGetInfluxDatabaseErrorHandling_2.java b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestGetInfluxDatabaseErrorHandling_2.java index 879aa37..96e8e7d 100644 --- a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestGetInfluxDatabaseErrorHandling_2.java +++ b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestGetInfluxDatabaseErrorHandling_2.java @@ -21,12 +21,11 @@ import com.influxdb.exceptions.InfluxException; import com.influxdb.exceptions.NotFoundException; -import org.influxdata.nifi.processors.internal.AbstractGetInfluxDatabase_2; - import edu.umd.cs.findbugs.annotations.NonNull; import org.apache.nifi.processor.Relationship; import org.apache.nifi.util.LogMessage; import org.apache.nifi.util.MockFlowFile; +import org.influxdata.nifi.processors.internal.AbstractGetInfluxDatabase_2; import org.junit.Assert; import org.junit.Test; @@ -127,10 +126,9 @@ private void verifyError(@NonNull final Exception exception, @NonNull final Stri List errors = logger.getErrorMessages(); // First is formatted message, Second Stack Trace - Assert.assertEquals(2, errors.size()); + Assert.assertEquals(1, errors.size()); Assert.assertTrue(errors.get(0).getArgs()[2].toString().contains("from(bucket:\"my-bucket\") |> range(start: 0) |> last()")); - Assert.assertEquals(errors.get(1).getThrowable().getLocalizedMessage(), message); - Assert.assertTrue(exception.getClass().isAssignableFrom(errors.get(1).getThrowable().getClass())); + Assert.assertEquals(errors.get(0).getArgs()[3], exception.toString()); } } \ No newline at end of file diff --git a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestGetInfluxDatabaseRecordErrorHandling_2.java b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestGetInfluxDatabaseRecordErrorHandling_2.java index baa401d..290b46e 100644 --- a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestGetInfluxDatabaseRecordErrorHandling_2.java +++ b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestGetInfluxDatabaseRecordErrorHandling_2.java @@ -21,12 +21,11 @@ import com.influxdb.exceptions.InfluxException; import com.influxdb.exceptions.NotFoundException; -import org.influxdata.nifi.processors.internal.AbstractGetInfluxDatabase_2; - import edu.umd.cs.findbugs.annotations.NonNull; import org.apache.nifi.processor.Relationship; import org.apache.nifi.util.LogMessage; import org.apache.nifi.util.MockFlowFile; +import org.influxdata.nifi.processors.internal.AbstractGetInfluxDatabase_2; import org.junit.Assert; import org.junit.Test; @@ -127,10 +126,9 @@ private void verifyError(@NonNull final Exception exception, @NonNull final Stri List errors = logger.getErrorMessages(); // First is formatted message, Second Stack Trace - Assert.assertEquals(2, errors.size()); + Assert.assertEquals(1, errors.size()); Assert.assertTrue(errors.get(0).getArgs()[2].toString().contains("from(bucket:\"my-bucket\") |> range(start: 0) |> last()")); - Assert.assertEquals(errors.get(1).getThrowable().getLocalizedMessage(), message); - Assert.assertTrue(exception.getClass().isAssignableFrom(errors.get(1).getThrowable().getClass())); + Assert.assertEquals(errors.get(0).getArgs()[3], exception.toString()); } } diff --git a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestPutInfluxDatabaseRecord.java b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestPutInfluxDatabaseRecord.java index 5f50eb1..aaf40a3 100644 --- a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestPutInfluxDatabaseRecord.java +++ b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestPutInfluxDatabaseRecord.java @@ -29,8 +29,6 @@ import java.util.Map; import java.util.concurrent.TimeUnit; -import org.influxdata.nifi.util.InfluxDBUtils; - import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; import org.apache.commons.lang3.reflect.FieldUtils; @@ -43,6 +41,7 @@ import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.util.LogMessage; import org.apache.nifi.util.MockFlowFile; +import org.influxdata.nifi.util.InfluxDBUtils; import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.junit.Assert; @@ -256,7 +255,7 @@ public void addMapFields() { Assert.assertEquals(1, points.size()); Assert.assertEquals(55.5F, getField("nifi-float", points.get(0))); - Assert.assertEquals(150F, getField("nifi-long", points.get(0))); + Assert.assertEquals(150L, getField("nifi-long", points.get(0))); Assert.assertEquals(true, getField("nifi-boolean", points.get(0))); Assert.assertEquals("string value", getField("nifi-string", points.get(0))); } @@ -315,7 +314,7 @@ public void addArrayFieldFail() { addFieldByType(new Object[]{25L, 55L}, null, dataType, false, PutInfluxDatabaseRecord.REL_FAILURE); List messages = logger.getErrorMessages(); - Assert.assertEquals(3, messages.size()); + Assert.assertEquals(2, messages.size()); Assert.assertTrue(messages.get(0).getMsg().contains("Complex value found for nifi-field; routing to failure")); } @@ -355,7 +354,7 @@ public void addRecordFieldFail() { addFieldRecordType(false, PutInfluxDatabaseRecord.REL_FAILURE); List messages = logger.getErrorMessages(); - Assert.assertEquals(3, messages.size()); + Assert.assertEquals(2, messages.size()); Assert.assertTrue(messages.get(0).getMsg().contains("Complex value found for nifi-field; routing to failure")); } @@ -662,7 +661,7 @@ public void addArrayTagFail() { addTagByType(new Object[]{25L, 55L}, null, dataType, false, PutInfluxDatabaseRecord.REL_FAILURE); List messages = logger.getErrorMessages(); - Assert.assertEquals(3, messages.size()); + Assert.assertEquals(2, messages.size()); Assert.assertTrue(messages.get(0).getMsg().contains("Complex value found for nifi-tag; routing to failure")); } @@ -702,7 +701,7 @@ public void addRecordTagFail() { addTagRecordType(false, PutInfluxDatabaseRecord.REL_FAILURE); List messages = logger.getErrorMessages(); - Assert.assertEquals(3, messages.size()); + Assert.assertEquals(2, messages.size()); Assert.assertTrue(messages.get(0).getMsg().contains("Complex value found for nifi-tag; routing to failure")); } diff --git a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestPutInfluxDatabaseRecordErrorHandling.java b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestPutInfluxDatabaseRecordErrorHandling.java index b0cebaa..778a89f 100644 --- a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestPutInfluxDatabaseRecordErrorHandling.java +++ b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestPutInfluxDatabaseRecordErrorHandling.java @@ -22,8 +22,6 @@ import java.util.List; import java.util.Map; -import org.influxdata.nifi.util.InfluxDBUtils; - import avro.shaded.com.google.common.collect.Maps; import edu.umd.cs.findbugs.annotations.NonNull; import org.apache.nifi.processor.Relationship; @@ -31,6 +29,7 @@ import org.apache.nifi.util.LogMessage; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessContext; +import org.influxdata.nifi.util.InfluxDBUtils; import org.influxdb.InfluxDBException; import org.influxdb.InfluxDBIOException; import org.influxdb.dto.Point; @@ -284,10 +283,10 @@ public void errorRelationshipHasErrorInLog() { List errors = logger.getErrorMessages(); // First is formatted message, Second Stack Trace - Assert.assertEquals(2, errors.size()); + Assert.assertEquals(1, errors.size()); Assert.assertTrue(errors.get(0).getMsg().contains(INFLUX_DB_ERROR_MESSAGE_LOG)); - Assert.assertTrue(errors.get(1).getThrowable() instanceof InfluxDBException.UnableToParseException); + Assert.assertEquals("org.influxdb.InfluxDBException$UnableToParseException: unable to parse", errors.get(0).getArgs()[3]); } @Test @@ -298,10 +297,10 @@ public void retryRelationshipHasErrorInLog() { List errors = logger.getErrorMessages(); // First is formatted message, Second Stack Trace - Assert.assertEquals(2, errors.size()); + Assert.assertEquals(1, errors.size()); Assert.assertTrue(errors.get(0).getMsg().contains(INFLUX_DB_ERROR_MESSAGE_LOG)); - Assert.assertTrue(errors.get(1).getThrowable() instanceof InfluxDBException.AuthorizationFailedException); + Assert.assertEquals("org.influxdb.InfluxDBException$AuthorizationFailedException: authorization failed", errors.get(0).getArgs()[3]); } private void errorToRetryRelationship(@NonNull final Class exceptionType, diff --git a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestPutInfluxDatabaseRecordErrorHandling_2.java b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestPutInfluxDatabaseRecordErrorHandling_2.java index 3561d70..afa009d 100644 --- a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestPutInfluxDatabaseRecordErrorHandling_2.java +++ b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestPutInfluxDatabaseRecordErrorHandling_2.java @@ -20,19 +20,18 @@ import java.util.List; import java.util.Map; +import avro.shaded.com.google.common.collect.Maps; import com.influxdb.client.write.Point; import com.influxdb.exceptions.InfluxException; import com.influxdb.exceptions.NotFoundException; -import org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor; -import org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor_2; -import org.influxdata.nifi.util.InfluxDBUtils; - -import avro.shaded.com.google.common.collect.Maps; import edu.umd.cs.findbugs.annotations.NonNull; import org.apache.nifi.processor.Relationship; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.util.LogMessage; import org.apache.nifi.util.MockFlowFile; +import org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor; +import org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor_2; +import org.influxdata.nifi.util.InfluxDBUtils; import org.junit.Assert; import org.junit.Test; @@ -218,10 +217,10 @@ public void errorRelationshipHasErrorInLog() { List errors = logger.getErrorMessages(); // First is formatted message, Second Stack Trace - Assert.assertEquals(2, errors.size()); + Assert.assertEquals(1, errors.size()); Assert.assertTrue(errors.get(0).getMsg().contains("Failed to insert into influxDB due")); - Assert.assertTrue(errors.get(1).getThrowable() instanceof InfluxException); + Assert.assertEquals("com.influxdb.exceptions.InfluxException: unable to parse", errors.get(0).getArgs()[2]); } @Test @@ -232,10 +231,10 @@ public void retryRelationshipHasErrorInLog() { List errors = logger.getErrorMessages(); // First is formatted message, Second Stack Trace - Assert.assertEquals(2, errors.size()); + Assert.assertEquals(1, errors.size()); Assert.assertTrue(errors.get(0).getMsg().contains("Failed to insert into influxDB due ")); - Assert.assertTrue(errors.get(1).getThrowable() instanceof InfluxException); + Assert.assertEquals("com.influxdb.exceptions.InfluxException: Simulate error: 503", errors.get(0).getArgs()[3]); } private void errorToRetryRelationship(@NonNull final InfluxException exception, diff --git a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestPutInfluxDatabaseRecord_2.java b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestPutInfluxDatabaseRecord_2.java index 143d1bd..3a98728 100644 --- a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestPutInfluxDatabaseRecord_2.java +++ b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestPutInfluxDatabaseRecord_2.java @@ -30,9 +30,6 @@ import com.influxdb.client.domain.WritePrecision; import com.influxdb.client.write.Point; -import org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor; -import org.influxdata.nifi.util.InfluxDBUtils; - import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; import org.apache.commons.lang3.reflect.FieldUtils; @@ -45,6 +42,8 @@ import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.util.LogMessage; import org.apache.nifi.util.MockFlowFile; +import org.influxdata.nifi.processors.internal.AbstractInfluxDatabaseProcessor; +import org.influxdata.nifi.util.InfluxDBUtils; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -259,7 +258,7 @@ public void addMapFields() { Assert.assertEquals(1, points.size()); Assert.assertEquals(55.5F, getField("nifi-float", points.get(0))); - Assert.assertEquals(150F, getField("nifi-long", points.get(0))); + Assert.assertEquals(150L, getField("nifi-long", points.get(0))); Assert.assertEquals(true, getField("nifi-boolean", points.get(0))); Assert.assertEquals("string value", getField("nifi-string", points.get(0))); } @@ -318,7 +317,7 @@ public void addArrayFieldFail() { addFieldByType(new Object[]{25L, 55L}, null, dataType, false, AbstractInfluxDatabaseProcessor.REL_FAILURE); List messages = logger.getErrorMessages(); - Assert.assertEquals(3, messages.size()); + Assert.assertEquals(2, messages.size()); Assert.assertTrue(messages.get(0).getMsg().contains("Complex value found for nifi-field; routing to failure")); } @@ -358,7 +357,7 @@ public void addRecordFieldFail() { addFieldRecordType(false, AbstractInfluxDatabaseProcessor.REL_FAILURE); List messages = logger.getErrorMessages(); - Assert.assertEquals(3, messages.size()); + Assert.assertEquals(2, messages.size()); Assert.assertTrue(messages.get(0).getMsg().contains("Complex value found for nifi-field; routing to failure")); } @@ -665,7 +664,7 @@ public void addArrayTagFail() { addTagByType(new Object[]{25L, 55L}, null, dataType, false, AbstractInfluxDatabaseProcessor.REL_FAILURE); List messages = logger.getErrorMessages(); - Assert.assertEquals(3, messages.size()); + Assert.assertEquals(2, messages.size()); Assert.assertTrue(messages.get(0).getMsg().contains("Complex value found for nifi-tag; routing to failure")); } @@ -705,7 +704,7 @@ public void addRecordTagFail() { addTagRecordType(false, AbstractInfluxDatabaseProcessor.REL_FAILURE); List messages = logger.getErrorMessages(); - Assert.assertEquals(3, messages.size()); + Assert.assertEquals(2, messages.size()); Assert.assertTrue(messages.get(0).getMsg().contains("Complex value found for nifi-tag; routing to failure")); } diff --git a/nifi-influx-database-serialization-nar/pom.xml b/nifi-influx-database-serialization-nar/pom.xml index 37d14af..44563ca 100644 --- a/nifi-influx-database-serialization-nar/pom.xml +++ b/nifi-influx-database-serialization-nar/pom.xml @@ -34,6 +34,12 @@ org.influxdata.nifi nifi-influx-database-serialization + + org.apache.nifi + nifi-standard-services-api-nar + ${nifi.version} + nar + diff --git a/nifi-influx-database-serialization/src/main/java/org/influxdata/nifi/serialization/InfluxLineProtocolReader.java b/nifi-influx-database-serialization/src/main/java/org/influxdata/nifi/serialization/InfluxLineProtocolReader.java index 7aa9f71..5deb7af 100644 --- a/nifi-influx-database-serialization/src/main/java/org/influxdata/nifi/serialization/InfluxLineProtocolReader.java +++ b/nifi-influx-database-serialization/src/main/java/org/influxdata/nifi/serialization/InfluxLineProtocolReader.java @@ -83,6 +83,7 @@ public void onEnabled(final ConfigurationContext context) { @Override public RecordReader createRecordReader(final Map variables, final InputStream in, + long inputLength, final ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { diff --git a/nifi-influx-database-serialization/src/main/java/org/influxdata/nifi/serialization/InfluxLineProtocolRecordReader.java b/nifi-influx-database-serialization/src/main/java/org/influxdata/nifi/serialization/InfluxLineProtocolRecordReader.java index 846e22a..96d149a 100644 --- a/nifi-influx-database-serialization/src/main/java/org/influxdata/nifi/serialization/InfluxLineProtocolRecordReader.java +++ b/nifi-influx-database-serialization/src/main/java/org/influxdata/nifi/serialization/InfluxLineProtocolRecordReader.java @@ -66,7 +66,7 @@ public InfluxLineProtocolRecordReader(@Nullable final InputStream in, // tags DataType mapDataType = RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()); - fields.add(new RecordField(TAG_SET, mapDataType, true)); + fields.add(new RecordField(TAG_SET, mapDataType, false)); // fields DataType choiceDataType = RecordFieldType.CHOICE.getChoiceDataType( diff --git a/nifi-influx-database-serialization/src/main/java/org/influxdata/nifi/serialization/InfluxLineProtocolRecordSetWriter.java b/nifi-influx-database-serialization/src/main/java/org/influxdata/nifi/serialization/InfluxLineProtocolRecordSetWriter.java index 28360b5..e8c516b 100644 --- a/nifi-influx-database-serialization/src/main/java/org/influxdata/nifi/serialization/InfluxLineProtocolRecordSetWriter.java +++ b/nifi-influx-database-serialization/src/main/java/org/influxdata/nifi/serialization/InfluxLineProtocolRecordSetWriter.java @@ -22,10 +22,6 @@ import java.util.List; import java.util.Map; -import org.influxdata.nifi.processors.MapperOptions; -import org.influxdata.nifi.util.InfluxDBUtils; -import org.influxdata.nifi.util.PropertyValueUtils; - import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; @@ -38,6 +34,9 @@ import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.RecordSchema; +import org.influxdata.nifi.processors.MapperOptions; +import org.influxdata.nifi.util.InfluxDBUtils; +import org.influxdata.nifi.util.PropertyValueUtils; /** * @author Jakub Bednar (bednar@github) (29/05/2019 09:07) @@ -93,7 +92,7 @@ public RecordSchema getSchema(final Map variables, final RecordS } @Override - public RecordSetWriter createWriter(final ComponentLog componentLog, final RecordSchema recordSchema, final OutputStream outputStream) { + public RecordSetWriter createWriter(final ComponentLog componentLog, final RecordSchema recordSchema, final OutputStream outputStream, final Map variables) { return new WriteInfluxLineProtocolResult(outputStream, recordSchema, componentLog, mapperOptions, charSet); } } \ No newline at end of file diff --git a/nifi-influx-database-serialization/src/test/java/org/influxdata/nifi/serialization/TestInfluxLineProtocolParser.java b/nifi-influx-database-serialization/src/test/java/org/influxdata/nifi/serialization/TestInfluxLineProtocolParser.java index 63c0789..24e764b 100644 --- a/nifi-influx-database-serialization/src/test/java/org/influxdata/nifi/serialization/TestInfluxLineProtocolParser.java +++ b/nifi-influx-database-serialization/src/test/java/org/influxdata/nifi/serialization/TestInfluxLineProtocolParser.java @@ -1025,6 +1025,16 @@ public void parsePointsQuotesInTags() { .validate("t159,label=another a=2i,value=1i 1"); } + @Test + public void tagsAreNotNull() throws NotParsableInlineProtocolData + { + InfluxLineProtocolParser parser = InfluxLineProtocolParser.parse("cpu value=1.0 1000000000"); + + Assert.assertNotNull(parser); + Assert.assertNotNull(parser.getTags()); + Assert.assertTrue(parser.getTags().isEmpty()); + } + private static final class ExpectedResult { private static final Logger LOG = LoggerFactory.getLogger(ExpectedResult.class); @@ -1083,7 +1093,7 @@ private void validate(@NonNull final String... lineProtocols) { Stream.of(lineProtocols).forEach(lineProtocol -> { - InfluxLineProtocolParser parse = null; + InfluxLineProtocolParser parse; try { parse = InfluxLineProtocolParser.parse(lineProtocol).report(); diff --git a/nifi-influx-database-serialization/src/test/java/org/influxdata/nifi/serialization/TestInfluxLineProtocolReader.java b/nifi-influx-database-serialization/src/test/java/org/influxdata/nifi/serialization/TestInfluxLineProtocolReader.java index 91a505b..9f5a916 100644 --- a/nifi-influx-database-serialization/src/test/java/org/influxdata/nifi/serialization/TestInfluxLineProtocolReader.java +++ b/nifi-influx-database-serialization/src/test/java/org/influxdata/nifi/serialization/TestInfluxLineProtocolReader.java @@ -25,6 +25,7 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.util.Utf8; +import org.apache.nifi.avro.AvroReaderWithEmbeddedSchema; import org.apache.nifi.avro.AvroRecordSetWriter; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processors.standard.ConvertRecord; @@ -33,6 +34,7 @@ import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -46,7 +48,7 @@ public class TestInfluxLineProtocolReader extends AbstractTestInfluxLineProtocol @Test public void createReaderFromMap() throws SchemaNotFoundException, MalformedRecordException, IOException { - RecordReader reader = readerFactory.createRecordReader(variables, null, logger); + RecordReader reader = readerFactory.createRecordReader(variables, null, -1, logger); Assert.assertNotNull(reader); } @@ -72,7 +74,8 @@ public void schemaNotNull() throws SchemaNotFoundException, MalformedRecordExcep } @Test - public void processIncomingLineProtocolToAVRO() throws InitializationException, IOException { + public void processIncomingLineProtocolToAVRO() throws InitializationException, IOException, MalformedRecordException + { String data = "weather,location=us-midwest " + "field-float=82.5,field-integer=85i,field-bool=True,field-string=\"hello\" 1465839830100400200"; @@ -118,13 +121,20 @@ public void processIncomingLineProtocolToAVRO() throws InitializationException, Assert.assertEquals(4, ((Map) next.get("fields")).size()); Assert.assertEquals(true, ((Map) next.get("fields")).get(new Utf8("field-bool"))); Assert.assertEquals(new Utf8("hello"), ((Map) next.get("fields")).get(new Utf8("field-string"))); - Assert.assertEquals(85.F, ((Map) next.get("fields")).get(new Utf8("field-integer"))); + Assert.assertEquals(85L, ((Map) next.get("fields")).get(new Utf8("field-integer"))); Assert.assertEquals(82.5F, ((Map) next.get("fields")).get(new Utf8("field-float"))); // timestamp Assert.assertEquals(1465839830100400200L, next.get("timestamp")); Assert.assertFalse(avroReader.hasNext()); + + // Read via Embedded reader + AvroReaderWithEmbeddedSchema avroReaderWithEmbeddedSchema = new AvroReaderWithEmbeddedSchema(new ByteArrayInputStream(success.toByteArray())); + Record record = avroReaderWithEmbeddedSchema.nextRecord(); + + Assert.assertNotNull(record); + Assert.assertNull(avroReaderWithEmbeddedSchema.nextRecord()); } @Test diff --git a/nifi-influx-database-serialization/src/test/java/org/influxdata/nifi/serialization/TestInfluxLineProtocolRecordReader.java b/nifi-influx-database-serialization/src/test/java/org/influxdata/nifi/serialization/TestInfluxLineProtocolRecordReader.java index 9d78c53..bdf5515 100644 --- a/nifi-influx-database-serialization/src/test/java/org/influxdata/nifi/serialization/TestInfluxLineProtocolRecordReader.java +++ b/nifi-influx-database-serialization/src/test/java/org/influxdata/nifi/serialization/TestInfluxLineProtocolRecordReader.java @@ -50,7 +50,7 @@ public class TestInfluxLineProtocolRecordReader extends AbstractTestInfluxLinePr @Test public void schemaNotNull() throws SchemaNotFoundException, MalformedRecordException, IOException { - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(), -1,logger); RecordSchema schema = recordReader.getSchema(); Assert.assertNotNull(schema); @@ -59,7 +59,7 @@ public void schemaNotNull() throws SchemaNotFoundException, MalformedRecordExcep @Test public void recordNotNull() throws MalformedRecordException, IOException, SchemaNotFoundException { - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(), -1,logger); Record record = recordReader.nextRecord(); Assert.assertNotNull(record); @@ -74,7 +74,7 @@ public void recordMultiple() throws SchemaNotFoundException, MalformedRecordExce String data = "weather temperature=82 1465839830100400200" + System.lineSeparator() + "weather temperature=80 1465839830100450200"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); Record record = recordReader.nextRecord(); Assert.assertNotNull(record); @@ -88,7 +88,7 @@ public void closeInputStream() throws SchemaNotFoundException, MalformedRecordEx InputStream spiedInputStream = Mockito.spy(toInputData()); - RecordReader recordReader = readerFactory.createRecordReader(variables, spiedInputStream, logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, spiedInputStream, -1, logger); recordReader.close(); Mockito.verify(spiedInputStream, Mockito.times(1)).close(); @@ -97,7 +97,7 @@ public void closeInputStream() throws SchemaNotFoundException, MalformedRecordEx @Test public void emptyData() throws SchemaNotFoundException, MalformedRecordException, IOException { - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(" "), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(" "), -1, logger); Assert.assertNull(recordReader.nextRecord()); } @@ -105,7 +105,7 @@ public void emptyData() throws SchemaNotFoundException, MalformedRecordException @Test public void measurement() throws SchemaNotFoundException, MalformedRecordException, IOException { - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(), -1,logger); RecordSchema schema = recordReader.getSchema(); Assert.assertEquals(STRING, schema.getDataType(InfluxLineProtocolRecordReader.MEASUREMENT).get().getFieldType()); @@ -119,7 +119,7 @@ public void measurementWithEscapedSpaces() throws SchemaNotFoundException, Malfo String data = "wea\\ ther,location=us-midwest temperature=82 1465839830100400200"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); Record record = recordReader.nextRecord(); Assert.assertEquals("wea ther", record.getValue(InfluxLineProtocolRecordReader.MEASUREMENT)); @@ -130,7 +130,7 @@ public void measurementWithEscapedComma() throws SchemaNotFoundException, Malfor String data = "wea\\,ther,location=us-midwest temperature=82 1465839830100400200"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); Record record = recordReader.nextRecord(); Assert.assertEquals("wea,ther", record.getValue(InfluxLineProtocolRecordReader.MEASUREMENT)); @@ -139,7 +139,7 @@ public void measurementWithEscapedComma() throws SchemaNotFoundException, Malfor @Test public void tags() throws SchemaNotFoundException, MalformedRecordException, IOException { - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(), -1,logger); RecordSchema schema = recordReader.getSchema(); Assert.assertEquals(MAP, schema.getDataType(InfluxLineProtocolRecordReader.TAG_SET).get().getFieldType()); @@ -156,7 +156,7 @@ public void tagsEmpty() throws SchemaNotFoundException, MalformedRecordException String data = "weather temperature=82 1465839830100400200"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); RecordSchema schema = recordReader.getSchema(); Assert.assertEquals(MAP, schema.getDataType(InfluxLineProtocolRecordReader.TAG_SET).get().getFieldType()); @@ -172,7 +172,7 @@ public void tagsMultiple() throws SchemaNotFoundException, MalformedRecordExcept String data = "weather,location=us-midwest,season=summer temperature=82 1465839830100400200"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); RecordSchema schema = recordReader.getSchema(); Assert.assertEquals(MAP, schema.getDataType(InfluxLineProtocolRecordReader.TAG_SET).get().getFieldType()); @@ -190,7 +190,7 @@ public void tagWithEscapedComma() throws SchemaNotFoundException, MalformedRecor String data = "weather,location=us\\,midwest temperature=82 1465839830100400200"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); Record record = recordReader.nextRecord(); Map tags = (Map) record.getValue(InfluxLineProtocolRecordReader.TAG_SET); @@ -204,7 +204,7 @@ public void tagWithEscapedEqual() throws SchemaNotFoundException, MalformedRecor String data = "weather,loca\\=tion=us-midwest temperature=82 1465839830100400200"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); Record record = recordReader.nextRecord(); Map tags = (Map) record.getValue(InfluxLineProtocolRecordReader.TAG_SET); @@ -218,7 +218,7 @@ public void tagWithEscapedSpace() throws SchemaNotFoundException, MalformedRecor String data = "weather,location=us\\ midwest temperature=82 1465839830100400200"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); Record record = recordReader.nextRecord(); Map tags = (Map) record.getValue(InfluxLineProtocolRecordReader.TAG_SET); @@ -230,7 +230,7 @@ public void tagWithEscapedSpace() throws SchemaNotFoundException, MalformedRecor @Test public void fields() throws SchemaNotFoundException, MalformedRecordException, IOException { - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(), -1,logger); RecordSchema schema = recordReader.getSchema(); Assert.assertEquals(MAP, schema.getDataType(InfluxLineProtocolRecordReader.FIELD_SET).get().getFieldType()); @@ -247,7 +247,7 @@ public void fieldsEmpty() throws SchemaNotFoundException, MalformedRecordExcepti String data = "weather 1465839830100400200"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); expectedException.expect(MalformedRecordException.class); expectedException.expectMessage("Not parsable data: 'weather 1465839830100400200'"); @@ -260,7 +260,7 @@ public void fieldsWrongFormatEmpty() throws SchemaNotFoundException, MalformedRe String data = "weather value 1465839830100400200"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); expectedException.expect(MalformedRecordException.class); expectedException.expectMessage("Not parsable data: 'weather value 1465839830100400200'"); @@ -273,7 +273,7 @@ public void fieldFloat() throws SchemaNotFoundException, MalformedRecordExceptio String data = "weather value=82 1465839830100400200"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); RecordSchema schema = recordReader.getSchema(); Assert.assertEquals(MAP, schema.getDataType(InfluxLineProtocolRecordReader.FIELD_SET).get().getFieldType()); @@ -290,7 +290,7 @@ public void fieldInteger() throws SchemaNotFoundException, MalformedRecordExcept String data = "weather value=83i 1465839830100400200"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); RecordSchema schema = recordReader.getSchema(); Assert.assertEquals(MAP, schema.getDataType(InfluxLineProtocolRecordReader.FIELD_SET).get().getFieldType()); @@ -310,7 +310,7 @@ public void fieldIntegerLargeThenInteger() throws MalformedRecordException, IOEx + "inodes_total=9223372036854775807i,inodes_free=9223372036853280100i,inodes_used=1495707i " + "1525932900000000000"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); Record record = recordReader.nextRecord(); @@ -326,7 +326,7 @@ public void fieldString() throws SchemaNotFoundException, MalformedRecordExcepti String data = "weather value=\"84\" 1465839830100400200"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); RecordSchema schema = recordReader.getSchema(); Assert.assertEquals(MAP, schema.getDataType(InfluxLineProtocolRecordReader.FIELD_SET).get().getFieldType()); @@ -344,7 +344,7 @@ public void fieldBoolean() throws SchemaNotFoundException, MalformedRecordExcept String data = "weather true1=t,true2=T,true3=true,true4=True,true5=TRUE," + "false1=f,false2=F,false3=false,false4=False,false5=FALSE 1465839830100400200"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); RecordSchema schema = recordReader.getSchema(); Assert.assertEquals(MAP, schema.getDataType(InfluxLineProtocolRecordReader.FIELD_SET).get().getFieldType()); @@ -370,7 +370,7 @@ public void fieldOnly() throws SchemaNotFoundException, MalformedRecordException String data = "weather value=84"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); RecordSchema schema = recordReader.getSchema(); Assert.assertEquals(MAP, schema.getDataType(InfluxLineProtocolRecordReader.FIELD_SET).get().getFieldType()); @@ -391,7 +391,7 @@ public void fieldNotParsable() throws SchemaNotFoundException, MalformedRecordEx String data = "weather,location=us-midwest temperature=\"82 1465839830100400200"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); expectedException.expect(MalformedRecordException.class); expectedException @@ -405,7 +405,7 @@ public void fieldWithEscapedComma() throws SchemaNotFoundException, MalformedRec String data = "weather,location=us-midwest temperature=82,measure=\"Cel\\,sius\" 1465839830100400200"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); Record record = recordReader.nextRecord(); Map fields = (Map) record.getValue(InfluxLineProtocolRecordReader.FIELD_SET); @@ -419,7 +419,7 @@ public void fieldWithEscapedEqual() throws SchemaNotFoundException, MalformedRec String data = "weather,location=us-midwest temperature=82,measure=\"Cel\\=sius\" 1465839830100400200"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); Record record = recordReader.nextRecord(); Map fields = (Map) record.getValue(InfluxLineProtocolRecordReader.FIELD_SET); @@ -433,7 +433,7 @@ public void fieldWithEscapedSpace() throws SchemaNotFoundException, MalformedRec String data = "weather,location=us-midwest temperature=82,measure=\"Cel\\ sius\" 1465839830100400200"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); Record record = recordReader.nextRecord(); Map fields = (Map) record.getValue(InfluxLineProtocolRecordReader.FIELD_SET); @@ -447,7 +447,7 @@ public void fieldWithSpace1() throws SchemaNotFoundException, MalformedRecordExc String data = "system,host=pikachu.local uptime_format=\"7 days, 5:38\" 1525951350000000000"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); Record record = recordReader.nextRecord(); Map fields = (Map) record.getValue(InfluxLineProtocolRecordReader.FIELD_SET); @@ -461,7 +461,7 @@ public void fieldWithSpace2() throws SchemaNotFoundException, MalformedRecordExc String data = "system,host=tomcat2 uptime_format=\" 2:25\" 1526037880000000000"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); Record record = recordReader.nextRecord(); Map fields = (Map) record.getValue(InfluxLineProtocolRecordReader.FIELD_SET); @@ -475,7 +475,7 @@ public void fieldWithEscapedQuotes() throws SchemaNotFoundException, MalformedRe String data = "weather,location=us-midwest temperature=82,measure=\"Cel\\\\\"sius\" 1465839830100400200"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); Record record = recordReader.nextRecord(); Map fields = (Map) record.getValue(InfluxLineProtocolRecordReader.FIELD_SET); @@ -487,7 +487,7 @@ public void fieldWithEscapedQuotes() throws SchemaNotFoundException, MalformedRe @Test public void timestamp() throws SchemaNotFoundException, MalformedRecordException, IOException { - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(), -1,logger); RecordSchema schema = recordReader.getSchema(); Assert.assertEquals(LONG, schema.getDataType(InfluxLineProtocolRecordReader.TIMESTAMP).get().getFieldType()); @@ -501,7 +501,7 @@ public void timestampInRFC3339() throws SchemaNotFoundException, MalformedRecord String data = "weather,location=us-midwest temperature=82 2016-10-31T06:52:20.020Z"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); RecordSchema schema = recordReader.getSchema(); Assert.assertEquals(LONG, schema.getDataType(InfluxLineProtocolRecordReader.TIMESTAMP).get().getFieldType()); @@ -517,7 +517,7 @@ public void timestampInWrongFormat() throws SchemaNotFoundException, MalformedRe String data = "weather,location=us-midwest temperature=82 wrong_format"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); expectedException.expect(MalformedRecordException.class); expectedException @@ -531,7 +531,7 @@ public void timestampEmpty() throws SchemaNotFoundException, MalformedRecordExce String data = "weather,location=us-midwest temperature=82"; - RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), logger); + RecordReader recordReader = readerFactory.createRecordReader(variables, toInputData(data), -1,logger); RecordSchema schema = recordReader.getSchema(); Assert.assertEquals(LONG, schema.getDataType(InfluxLineProtocolRecordReader.TIMESTAMP).get().getFieldType()); diff --git a/nifi-influx-database-services/src/test/java/org/influxdata/nifi/services/AbstractTestStandardInfluxDatabaseService.java b/nifi-influx-database-services/src/test/java/org/influxdata/nifi/services/AbstractTestStandardInfluxDatabaseService.java index b50a101..fcc5d05 100644 --- a/nifi-influx-database-services/src/test/java/org/influxdata/nifi/services/AbstractTestStandardInfluxDatabaseService.java +++ b/nifi-influx-database-services/src/test/java/org/influxdata/nifi/services/AbstractTestStandardInfluxDatabaseService.java @@ -54,8 +54,8 @@ protected void setUp(@NonNull final Supplier answerConnect) throws Excep // Mock response Mockito.doAnswer(invocation -> answerConnect.get().answer(invocation)) .when(service) - .connect(Mockito.anyString(), - Mockito.anyString(), + .connect(Mockito.any(), + Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), diff --git a/nifi-influx-database-services/src/test/java/org/influxdata/nifi/services/TestStandardInfluxDatabaseServiceSettings.java b/nifi-influx-database-services/src/test/java/org/influxdata/nifi/services/TestStandardInfluxDatabaseServiceSettings.java index 20b20c8..e9f490a 100644 --- a/nifi-influx-database-services/src/test/java/org/influxdata/nifi/services/TestStandardInfluxDatabaseServiceSettings.java +++ b/nifi-influx-database-services/src/test/java/org/influxdata/nifi/services/TestStandardInfluxDatabaseServiceSettings.java @@ -21,6 +21,7 @@ import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.StandardSSLContextService; import org.influxdb.InfluxDB; import org.junit.Before; import org.junit.Test; @@ -63,6 +64,17 @@ public void sslContextService() throws InitializationException, IOException, Gen SSLContextService sslContextService = Mockito.mock(SSLContextService.class); when(sslContextService.getIdentifier()).thenReturn("inluxdb-ssl"); + + when(sslContextService.getTrustStoreType()).thenReturn(StandardSSLContextService.STORE_TYPE_JKS); + when(sslContextService.getTrustStoreFile()).thenReturn("src/test/resources/ssl/truststore.jks"); + when(sslContextService.getTrustStorePassword()).thenReturn("changeme"); + when(sslContextService.isTrustStoreConfigured()).thenReturn(true); + + when(sslContextService.getKeyStoreFile()).thenReturn("src/test/resources/ssl/keystore.jks"); + when(sslContextService.getKeyStorePassword()).thenReturn("changeme"); + when(sslContextService.getKeyStoreType()).thenReturn(StandardSSLContextService.STORE_TYPE_JKS); + when(sslContextService.isKeyStoreConfigured()).thenReturn(true); + testRunner.addControllerService("inluxdb-ssl", sslContextService); testRunner.enableControllerService(sslContextService); @@ -103,7 +115,7 @@ public void clientAuth() throws IOException, GeneralSecurityException { @Test public void url() throws IOException, GeneralSecurityException { - testRunner.setProperty(service, InfluxDatabaseService.INFLUX_DB_URL, "http://influxdb:8886"); + testRunner.setProperty(service, InfluxDatabaseService.INFLUX_DB_URL, "http://localhost:8886"); testRunner.assertValid(service); testRunner.enableControllerService(service); @@ -114,7 +126,7 @@ public void url() throws IOException, GeneralSecurityException { Mockito.eq(null), Mockito.eq(null), Mockito.eq(InfluxDatabaseService.DEFAULT_CLIENT_AUTH), - Mockito.eq("http://influxdb:8886"), + Mockito.eq("http://localhost:8886"), Mockito.eq(0L)); } diff --git a/nifi-influx-database-utils/src/test/java/org/influxdata/nifi/util/TestRecordToPointMapper.java b/nifi-influx-database-utils/src/test/java/org/influxdata/nifi/util/TestRecordToPointMapper.java index f6519d7..497c7bc 100644 --- a/nifi-influx-database-utils/src/test/java/org/influxdata/nifi/util/TestRecordToPointMapper.java +++ b/nifi-influx-database-utils/src/test/java/org/influxdata/nifi/util/TestRecordToPointMapper.java @@ -24,15 +24,14 @@ import java.util.List; import java.util.Map; -import org.influxdata.nifi.processors.MapperOptions; -import org.influxdata.nifi.processors.RecordToPointMapper; - import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; +import org.influxdata.nifi.processors.MapperOptions; +import org.influxdata.nifi.processors.RecordToPointMapper; import org.influxdb.dto.Point; import org.junit.Assert; import org.junit.Before; @@ -312,7 +311,7 @@ public void map() { List points = mapper.mapRecord(record); Assert.assertEquals(1, points.size()); - Assert.assertEquals("measurement-test,tag1=1970-01-02,tag2=555 boolean=true,field1=25.0,field2=true,float=55.5,long=150.0,string=\"string value\" 123456789", points.get(0).lineProtocol()); + Assert.assertEquals("measurement-test,tag1=1970-01-02,tag2=555 boolean=true,field1=25.0,field2=true,float=55.5,long=150i,string=\"string value\" 123456789", points.get(0).lineProtocol()); } @Test diff --git a/nifi-influx-database-utils/src/test/java/org/influxdata/nifi/util/TestRecordToPointMapperV2.java b/nifi-influx-database-utils/src/test/java/org/influxdata/nifi/util/TestRecordToPointMapperV2.java index 7911a7e..57e8b34 100644 --- a/nifi-influx-database-utils/src/test/java/org/influxdata/nifi/util/TestRecordToPointMapperV2.java +++ b/nifi-influx-database-utils/src/test/java/org/influxdata/nifi/util/TestRecordToPointMapperV2.java @@ -26,15 +26,14 @@ import com.influxdb.client.domain.WritePrecision; import com.influxdb.client.write.Point; -import org.influxdata.nifi.processors.MapperOptions; -import org.influxdata.nifi.processors.RecordToPointMapper; - import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; +import org.influxdata.nifi.processors.MapperOptions; +import org.influxdata.nifi.processors.RecordToPointMapper; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -313,7 +312,7 @@ public void map() { List points = mapper.mapRecordV2(record); Assert.assertEquals(1, points.size()); - Assert.assertEquals("measurement-test,tag1=1970-01-02,tag2=555 boolean=true,field1=25.0,field2=true,float=55.5,long=150.0,string=\"string value\" 123456789", points.get(0).toLineProtocol()); + Assert.assertEquals("measurement-test,tag1=1970-01-02,tag2=555 boolean=true,field1=25.0,field2=true,float=55.5,long=150i,string=\"string value\" 123456789", points.get(0).toLineProtocol()); } @Test diff --git a/pom.xml b/pom.xml index c202aa6..d28b9aa 100644 --- a/pom.xml +++ b/pom.xml @@ -24,7 +24,7 @@ org.apache.nifi nifi-nar-bundles - 1.9.2 + 1.10.0 nifi-influx-database-bundle @@ -101,7 +101,7 @@ UTF-8 - 1.9.2 + 1.10.0 2.22.0 0.8.2 @@ -204,13 +204,13 @@ org.influxdb influxdb-java - 2.15 + 2.17 com.influxdb influxdb-client-java - 1.2.0 + 1.4.0 diff --git a/scripts/Dockerfile b/scripts/Dockerfile index 0020682..08314a9 100644 --- a/scripts/Dockerfile +++ b/scripts/Dockerfile @@ -15,7 +15,7 @@ # limitations under the License. # -ARG NIFI_IMAGE=apache/nifi:1.9.2 +ARG NIFI_IMAGE=apache/nifi:1.10.0 FROM $NIFI_IMAGE diff --git a/scripts/flow.xml b/scripts/flow.xml index 60ec4e0..c6c3aa5 100644 --- a/scripts/flow.xml +++ b/scripts/flow.xml @@ -212,7 +212,7 @@ Set as Line Protocol. org.apache.nifi nifi-standard-nar - 1.9.2 + 1.10.0 1 0 sec @@ -245,7 +245,7 @@ Set as Line Protocol. org.apache.nifi nifi-standard-nar - 1.9.2 + 1.10.0 1 0 sec @@ -281,7 +281,7 @@ Set as Line Protocol. org.apache.nifi nifi-standard-nar - 1.9.2 + 1.10.0 1 0 sec @@ -382,7 +382,7 @@ Set as Line Protocol. org.apache.nifi nifi-standard-nar - 1.9.2 + 1.10.0 1 0 sec @@ -429,7 +429,7 @@ Set as Line Protocol. org.apache.nifi nifi-standard-nar - 1.9.2 + 1.10.0 1 0 sec @@ -468,7 +468,7 @@ Set as Line Protocol. org.apache.nifi nifi-standard-nar - 1.9.2 + 1.10.0 1 0 sec @@ -501,7 +501,7 @@ Set as Line Protocol. org.apache.nifi nifi-standard-nar - 1.9.2 + 1.10.0 1 0 sec @@ -537,7 +537,7 @@ Set as Line Protocol. org.apache.nifi nifi-standard-nar - 1.9.2 + 1.10.0 1 0 sec @@ -570,7 +570,7 @@ Set as Line Protocol. org.apache.nifi nifi-standard-nar - 1.9.2 + 1.10.0 1 0 sec @@ -657,7 +657,7 @@ Set as Line Protocol. org.apache.nifi nifi-standard-nar - 1.9.2 + 1.10.0 1 0 sec @@ -744,7 +744,7 @@ Set as Line Protocol. org.apache.nifi nifi-standard-nar - 1.9.2 + 1.10.0 1 0 sec @@ -780,7 +780,7 @@ Set as Line Protocol. org.apache.nifi nifi-standard-nar - 1.9.2 + 1.10.0 1 0 sec @@ -1190,7 +1190,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-http-context-map-nar - 1.9.2 + 1.10.0 true @@ -1210,7 +1210,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-record-serialization-services-nar - 1.9.2 + 1.10.0 true @@ -1286,7 +1286,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-http-context-map-nar - 1.9.2 + 1.10.0 true @@ -1306,7 +1306,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-record-serialization-services-nar - 1.9.2 + 1.10.0 true @@ -1343,7 +1343,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-record-serialization-services-nar - 1.9.2 + 1.10.0 true @@ -1438,7 +1438,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-record-serialization-services-nar - 1.9.2 + 1.10.0 true @@ -1507,7 +1507,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-record-serialization-services-nar - 1.9.2 + 1.10.0 true @@ -1655,7 +1655,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-standard-nar - 1.9.2 + 1.10.0 1 0 sec @@ -1707,7 +1707,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-social-media-nar - 1.9.2 + 1.10.0 1 0 sec @@ -2042,7 +2042,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-standard-nar - 1.9.2 + 1.10.0 1 5 sec @@ -2099,7 +2099,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-standard-nar - 1.9.2 + 1.10.0 1 0 sec @@ -2151,7 +2151,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-kafka-0-10-nar - 1.9.2 + 1.10.0 1 0 sec @@ -2236,7 +2236,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-update-attribute-nar - 1.9.2 + 1.10.0 1 0 sec @@ -2394,7 +2394,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-standard-nar - 1.9.2 + 1.10.0 1 0 sec @@ -2659,7 +2659,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-record-serialization-services-nar - 1.9.2 + 1.10.0 true @@ -2781,7 +2781,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-record-serialization-services-nar - 1.9.2 + 1.10.0 true @@ -2826,7 +2826,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-registry-nar - 1.9.2 + 1.10.0 true @@ -2868,7 +2868,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-standard-nar - 1.9.2 + 1.10.0 1 0 sec @@ -2921,7 +2921,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-standard-nar - 1.9.2 + 1.10.0 1 0 sec @@ -3069,7 +3069,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-standard-nar - 1.9.2 + 1.10.0 1 0 sec @@ -3192,7 +3192,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-standard-nar - 1.9.2 + 1.10.0 1 0 sec @@ -3244,7 +3244,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-standard-nar - 1.9.2 + 1.10.0 1 0 sec @@ -3533,7 +3533,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-record-serialization-services-nar - 1.9.2 + 1.10.0 false @@ -3602,7 +3602,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-record-serialization-services-nar - 1.9.2 + 1.10.0 true @@ -3654,7 +3654,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-record-serialization-services-nar - 1.9.2 + 1.10.0 true @@ -3756,7 +3756,7 @@ curl -i -X GET -G http://localhost:8234 \ org.apache.nifi nifi-record-serialization-services-nar - 1.9.2 + 1.10.0 true @@ -3933,7 +3933,7 @@ curl -i -X GET -G http://localhost:8234 \ nifi-standard-nar org.apache.nifi - 1.9.2 + 1.10.0 WARN @@ -4051,7 +4051,7 @@ curl -i -X GET -G http://localhost:8234 \ nifi-social-media-nar org.apache.nifi - 1.9.2 + 1.10.0 WARN diff --git a/scripts/influxdb-restart.sh b/scripts/influxdb-restart.sh index b810283..c70ddd0 100755 --- a/scripts/influxdb-restart.sh +++ b/scripts/influxdb-restart.sh @@ -30,9 +30,11 @@ DEFAULT_INFLUXDB_VERSION="1.7" INFLUXDB_VERSION="${INFLUXDB_VERSION:-$DEFAULT_INFLUXDB_VERSION}" INFLUXDB_IMAGE=influxdb:${INFLUXDB_VERSION}-alpine -DEFAULT_INFLUXDB_V2_VERSION="nightly" +DEFAULT_INFLUXDB_V2_REPOSITORY="influxdb" +DEFAULT_INFLUXDB_V2_VERSION="2.0.0-beta" +INFLUXDB_V2_REPOSITORY="${INFLUXDB_V2_REPOSITORY:-$DEFAULT_INFLUXDB_V2_REPOSITORY}" INFLUXDB_V2_VERSION="${INFLUXDB_V2_VERSION:-$DEFAULT_INFLUXDB_V2_VERSION}" -INFLUXDB_V2_IMAGE=${DOCKER_REGISTRY}influx:${INFLUXDB_V2_VERSION} +INFLUXDB_V2_IMAGE=${DOCKER_REGISTRY}${INFLUXDB_V2_REPOSITORY}:${INFLUXDB_V2_VERSION} SCRIPT_PATH="$( cd "$(dirname "$0")" ; pwd -P )" diff --git a/scripts/nifi-restart.sh b/scripts/nifi-restart.sh index 1a71885..cd45ec1 100755 --- a/scripts/nifi-restart.sh +++ b/scripts/nifi-restart.sh @@ -1,3 +1,4 @@ +#!/bin/bash # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with @@ -15,7 +16,6 @@ # limitations under the License. # -#!/usr/bin/env bash set -e @@ -23,7 +23,7 @@ DEFAULT_INFLUXDB_VERSION="1.7" INFLUXDB_VERSION="${INFLUXDB_VERSION:-$DEFAULT_INFLUXDB_VERSION}" INFLUXDB_IMAGE=influxdb:${INFLUXDB_VERSION}-alpine -DEFAULT_NIFI_VERSION="1.9.2" +DEFAULT_NIFI_VERSION="1.10.0" NIFI_VERSION="${NIFI_VERSION:-$DEFAULT_NIFI_VERSION}" NIFI_IMAGE=apache/nifi:${NIFI_VERSION}