diff --git a/README.md b/README.md
index ffdf914..b639208 100644
--- a/README.md
+++ b/README.md
@@ -83,7 +83,6 @@ When set to `true` a single email with grouped errors will be sent. When set to
## Todo
- Optional inclusion of FlowFile contents.
-- Add batching support in email provenance reporter.
- Add testing.
## License
diff --git a/nifi-provenance-reporting-tasks/pom.xml b/nifi-provenance-reporting-tasks/pom.xml
index 17f20bc..bf34fad 100644
--- a/nifi-provenance-reporting-tasks/pom.xml
+++ b/nifi-provenance-reporting-tasks/pom.xml
@@ -52,7 +52,7 @@
com.fasterxml.jackson.core
jackson-databind
- 2.17.2
+ 2.18.0
org.apache.nifi
diff --git a/nifi-provenance-reporting-tasks/src/main/java/io/egm/nifi/reporting/ElasticsearchProvenanceReporter.java b/nifi-provenance-reporting-tasks/src/main/java/io/egm/nifi/reporting/ElasticsearchProvenanceReporter.java
index 21f04ac..8195f37 100644
--- a/nifi-provenance-reporting-tasks/src/main/java/io/egm/nifi/reporting/ElasticsearchProvenanceReporter.java
+++ b/nifi-provenance-reporting-tasks/src/main/java/io/egm/nifi/reporting/ElasticsearchProvenanceReporter.java
@@ -36,6 +36,7 @@
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -43,6 +44,13 @@
@Tags({"elasticsearch", "provenance"})
@CapabilityDescription("A provenance reporting task that writes to Elasticsearch")
public class ElasticsearchProvenanceReporter extends AbstractProvenanceReporter {
+
+ static final List DEFAULT_PROCESSORS_TYPES_ALLOWLIST = Arrays.asList(
+ "DeleteSFTP", "ExecuteSQLRecord", "ExtendedValidateCsv", "FetchFTP",
+ "FetchSFTP", "FetchSmb", "GenerateFlowFile", "GetFTP", "GetSFTP", "GetSmbFile", "InvokeHTTP", "ListenFTP",
+ "ListFTP", "ListSFTP", "ListSmb", "PutFTP", "PutSFTP", "PutSmbFile"
+ );
+
public static final PropertyDescriptor ELASTICSEARCH_URL = new PropertyDescriptor
.Builder().name("Elasticsearch URL")
.displayName("Elasticsearch URL")
@@ -62,6 +70,15 @@ public class ElasticsearchProvenanceReporter extends AbstractProvenanceReporter
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ public static final PropertyDescriptor PROCESSORS_TYPES_ALLOWLIST = new PropertyDescriptor
+ .Builder().name("Processors Types Allowlist")
+ .displayName("Processors Types Allowlist")
+ .description("Specifies a comma-separated list of processors types for which all provenance events "
+ + "will be sent. If the processor type is not in the list, only error events will be sent.")
+ .defaultValue(String.join(",", DEFAULT_PROCESSORS_TYPES_ALLOWLIST))
+ .addValidator(StandardValidators.createListValidator(true, true, StandardValidators.NON_BLANK_VALIDATOR))
+ .build();
+
private final Map esClients = new HashMap<>();
private final ObjectMapper objectMapper = new ObjectMapper();
@@ -85,6 +102,7 @@ public final List getSupportedPropertyDescriptors() {
descriptors = super.getSupportedPropertyDescriptors();
descriptors.add(ELASTICSEARCH_URL);
descriptors.add(ELASTICSEARCH_INDEX);
+ descriptors.add(PROCESSORS_TYPES_ALLOWLIST);
return descriptors;
}
@@ -92,6 +110,9 @@ public void indexEvents(final List